/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2016 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

// $Id: we_ddlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $

#include <unistd.h>
#include "boost/filesystem/operations.hpp"
#include "boost/filesystem/path.hpp"
#include "boost/scoped_ptr.hpp"
using namespace std;

#include "bytestream.h"
using namespace messageqcpp;

#include "we_messages.h"
#include "we_message_handlers.h"
#include "we_ddlcommon.h"
#include "we_ddlcommandproc.h"
#include "ddlpkg.h"
using namespace ddlpackage;
#include <ctime>
#include "dataconvert.h"
using namespace dataconvert;
// #include "we_brm.h"
namespace fs = boost::filesystem;
#include "cacheutils.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
using namespace idbdatafile;

using namespace execplan;

namespace WriteEngine
{
WE_DDLCommandProc::WE_DDLCommandProc()
{
  filesPerColumnPartition = 8;
  extentsPerSegmentFile = 1;
  dbrootCnt = 1;
  extentRows = 0x800000;
  config::Config* cf = config::Config::makeConfig();
  string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition");

  if (fpc.length() != 0)
    filesPerColumnPartition = cf->uFromText(fpc);

  // MCOL-4685: remove the option to set more than 2 extents per file (ExtentsPreSegmentFile).
  extentsPerSegmentFile = DEFAULT_EXTENTS_PER_SEGMENT_FILE;

  string dbct = cf->getConfig("SystemConfig", "DBRootCount");

  if (dbct.length() != 0)
    dbrootCnt = cf->uFromText(dbct);
}
WE_DDLCommandProc::WE_DDLCommandProc(const WE_DDLCommandProc& rhs)
{
}
WE_DDLCommandProc::~WE_DDLCommandProc()
{
}
uint8_t WE_DDLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err)
{
  uint32_t columnOid, sessionID;
  uint64_t nextVal;
  int rc = 0;
  bs >> columnOid;
  bs >> nextVal;
  bs >> sessionID;
  uint16_t dbRoot;
  BRM::OID_t oid = 1021;
  fDbrm.getSysCatDBRoot(oid, dbRoot);
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;
  oids[columnOid] = columnOid;

  // oidsToFlush.push_back(columnOid);
  if (idbdatafile::IDBPolicy::useHdfs())
    fWEWrapper.startTransaction(sessionID);

  rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot);

  if (rc != 0)
  {
    err = "Error in WE::updateNextValue";
    rc = 1;
  }

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    fWEWrapper.flushDataFiles(rc, sessionID, oids);
    fWEWrapper.confirmTransaction(sessionID);

    if (rc == 0)
      fWEWrapper.endTransaction(sessionID, true);
    else
      fWEWrapper.endTransaction(sessionID, false);
  }

  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);
  return rc;
}

uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  int txnID, tableOID, tableAUXColumnOID;
  uint32_t tableWithAutoi;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> tmp32;
  tableOID = tmp32;
  bs >> tmp32;
  tableAUXColumnOID = tmp32;
  bs >> tmp32;
  tableWithAutoi = tmp32;
  bs >> tmp32;
  uint16_t dbroot = tmp32;
  ddlpackage::TableDef tableDef;
  tableDef.unserialize(bs);

  WriteEngine::ColTuple colTuple;
  WriteEngine::ColStruct colStruct;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  WriteEngine::ColTupleList colTuples;
  WriteEngine::dictStr dctColTuples;
  WriteEngine::DctnryStruct dctnryStruct;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DictStrList dctnryValueList;
  WriteEngine::RIDList ridList;
  CalpontSystemCatalog::TableName tableName;
  CalpontSystemCatalog::ROPair sysTableROPair;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  ColumnList columns;
  ColumnList::const_iterator column_iterator;
  DDLColumn column;
  int error = 0;

  tableName.schema = CALPONT_SCHEMA;
  tableName.table = SYSTABLE_TABLE;

  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  std::map<uint32_t, uint32_t> oids;

  // std::vector<BRM::OID_t>  oidsToFlush;
  try
  {
    sysTableROPair = systemCatalogPtr->tableRID(tableName);

    getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);

    column_iterator = columns.begin();
    NullString tmpStr;

    while (column_iterator != columns.end())
    {
      column = *column_iterator;
      boost::to_lower(column.tableColName.column);

      tmpStr.dropString();

      if (TABLENAME_COL == column.tableColName.column)
      {
        std::string tablename = tableDef.fQualifiedName->fName;
        colTuple.data = tablename;
        tmpStr.assign(tablename);
      }
      else if (SCHEMA_COL == column.tableColName.column)
      {
        std::string schema = tableDef.fQualifiedName->fSchema;
        colTuple.data = schema;
        tmpStr.assign(schema);
      }
      else if (OBJECTID_COL == column.tableColName.column)
      {
        colTuple.data = tableOID;
      }
      else if (AUXCOLUMNOID_COL == column.tableColName.column)
      {
        colTuple.data = tableAUXColumnOID;
      }
      else if (CREATEDATE_COL == column.tableColName.column)
      {
        time_t t;
        struct tm tmp;
        Date aDay;

        t = time(NULL);
        gmtime_r(&t, &tmp);
        aDay.year = tmp.tm_year + 1900;
        aDay.month = tmp.tm_mon + 1;
        aDay.day = tmp.tm_mday;

        colTuple.data = *(reinterpret_cast<int*>(&aDay));
      }
      else if (INIT_COL == column.tableColName.column)
      {
        colTuple.data = column.colType.getNullValueForType();
      }
      else if (NEXT_COL == column.tableColName.column)
      {
        colTuple.data = column.colType.getNullValueForType();
      }
      else if (AUTOINC_COL == column.tableColName.column)
      {
        colTuple.data = tableWithAutoi;
      }
      else
      {
        colTuple.data = column.colType.getNullValueForType();
      }

      colStruct.dataOid = column.oid;
      colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
      colStruct.tokenFlag = false;
      colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
      colStruct.colDataType = column.colType.colDataType;
      colStruct.fColDbRoot = dbroot;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        colStruct.fCompressionType = 2;
        dctnryStruct.fCompressionType = 2;
      }

      dctnryStruct.fColDbRoot = dbroot;

      if (colStruct.tokenFlag)
      {
        dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
        dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
        dctnryStruct.columnOid = column.oid;
      }
      else
      {
        dctnryStruct.dctnryOid = 0;
        dctnryStruct.columnOid = column.oid;
      }

      colStructs.push_back(colStruct);
      cscColTypeList.push_back(column.colType);
      oids[colStruct.dataOid] = colStruct.dataOid;

      // oidsToFlush.push_back(colStruct.dataOid);
      if (dctnryStruct.dctnryOid > 0)
      {
        oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
        // oidsToFlush.push_back(dctnryStruct.dctnryOid);
      }

      colTuples.push_back(colTuple);

      dctColTuples.push_back(tmpStr);

      colValuesList.push_back(colTuples);

      dctnryStructList.push_back(dctnryStruct);

      dctnryValueList.push_back(dctColTuples);

      colTuples.pop_back();
      dctColTuples.pop_back();

      ++column_iterator;
    }

    // fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
    fWEWrapper.setTransId(txnID);
    fWEWrapper.setIsInsert(true);
    fWEWrapper.setBulkFlag(false);
    fWEWrapper.startTransaction(txnID);

    if (0 != colStructs.size())
    {
      // MCOL-66 The DBRM can't handle concurrent transactions to sys tables
      // TODO: This may be redundant
      static boost::mutex dbrmMutex;
      boost::mutex::scoped_lock lk(dbrmMutex);
      error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
                                             dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);

      if (error != WriteEngine::NO_ERROR)
      {
        if (error == ERR_BRM_WR_VB_ENTRY)
        {
          throw std::runtime_error("WE: Error writing to BRM.");
        }
        else
        {
          WErrorCodes ec;
          throw std::runtime_error("WE: Error updating calpontsys.systable:" + ec.errorString(error));
        }
      }

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        int rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);

        if ((error == 0) && (rc1 == 0))
        {
          rc1 = fWEWrapper.confirmTransaction(txnID);

          if (rc1 == NO_ERROR)
            rc1 = fWEWrapper.endTransaction(txnID, true);
          else
            fWEWrapper.endTransaction(txnID, false);
        }
        else
        {
          fWEWrapper.endTransaction(txnID, false);
        }
      }
    }
  }
  catch (exception& ex)
  {
    err += ex.what();
    rc = 1;
  }
  catch (...)
  {
    err += "Unknown exception caught";
    rc = 1;
  }

  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);

  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  return rc;
}

uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32, columnSize, dictSize, i;
  uint8_t tmp8;
  int txnID, colpos;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> columnSize;

  // deserialize column Oid and dictionary oid
  vector<uint32_t> coloids;
  vector<uint32_t> dictoids;

  for (i = 0; i < columnSize; ++i)
  {
    bs >> tmp32;
    coloids.push_back(tmp32);
  }

  bs >> dictSize;

  for (i = 0; i < dictSize; ++i)
  {
    bs >> tmp32;
    dictoids.push_back(tmp32);
  }

  bool alterFlag = 0;
  bs >> tmp8;
  alterFlag = (tmp8 != 0);
  bs >> tmp32;
  colpos = tmp32;
  bs >> tmp32;
  uint16_t dbroot = tmp32;
  ddlpackage::TableDef tableDef;
  tableDef.unserialize(bs);

  WriteEngine::ColTuple colTuple;
  WriteEngine::ColStruct colStruct;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  WriteEngine::ColTupleList colTuples;
  WriteEngine::dictStr dctColTuples;
  WriteEngine::DctnryStruct dctnryStruct;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DictStrList dctnryValueList;
  WriteEngine::RIDList ridList;
  CalpontSystemCatalog::TableName tableName;
  CalpontSystemCatalog::ROPair sysTableROPair;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  ColumnList columns;
  ColumnList::const_iterator column_iterator;
  DDLColumn column;
  int error = 0;

  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  std::map<uint32_t, uint32_t> oids;
  int rc1 = 0;
  ColumnDef* colDefPtr = 0;
  ColumnDefList::const_iterator iter;

  int startPos = colpos;

  tableName.schema = CALPONT_SCHEMA;
  tableName.table = SYSCOLUMN_TABLE;

  getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);
  unsigned int numCols = columns.size();
  // WriteEngine::ColTupleList colList[numCols];
  // ColTupleList is NOT POD, so let's try this:
  std::vector<WriteEngine::ColTupleList> colList;
  // WriteEngine::dictStr dctColList[numCols];
  std::vector<WriteEngine::dictStr> dctColList;
  ColumnDefList tableDefCols = tableDef.fColumns;

  ddlpackage::QualifiedName qualifiedName = *(tableDef.fQualifiedName);
  iter = tableDefCols.begin();
  // colpos = 0;
  NullString tmpStr;

  for (unsigned int ii = 0; ii < numCols; ii++)
  {
    colList.push_back(WriteEngine::ColTupleList());
    dctColList.push_back(WriteEngine::dictStr());
  }

  try
  {
    unsigned int col = 0;
    unsigned int dictcol = 0;

    while (iter != tableDefCols.end())
    {
      colDefPtr = *iter;

      DictOID dictOID = {0, 0, 0, 0, 0};

      int dataType;
      dataType = convertDataType(colDefPtr->fType->fType);

      if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL)
      {
        if (colDefPtr->fType->fPrecision > 38)  // precision cannot be over 38.
        {
          ostringstream os;
          os << "Syntax error: The maximum precision (total number of digits) that can be specified is 38";
          throw std::runtime_error(os.str());
        }
        else if (colDefPtr->fType->fPrecision < colDefPtr->fType->fScale)
        {
          ostringstream os;
          os << "Syntax error: scale should be less than precision, precision: "
             << colDefPtr->fType->fPrecision << " scale: " << colDefPtr->fType->fScale;
          throw std::runtime_error(os.str());
        }

        colDefPtr->convertDecimal();
      }

      bool hasDict = false;

      if ((dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
          (dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
          (dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
          (dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ||
          (dataType == CalpontSystemCatalog::TEXT && colDefPtr->fType->fLength > 7))
      {
        hasDict = true;
        dictOID.compressionType = colDefPtr->fType->fCompressiontype;
        dictOID.colWidth = colDefPtr->fType->fLength;
        dictOID.dictOID = dictoids[dictcol];
        dictcol++;

        //@Bug 2534. Take away the limit of 255 and set the limit to 8000.
        if ((colDefPtr->fType->fLength > 8000) && (dataType != CalpontSystemCatalog::BLOB) &&
            (dataType != CalpontSystemCatalog::TEXT))
        {
          ostringstream os;
          os << "char, varchar and varbinary length may not exceed 8000 bytes";
          throw std::runtime_error(os.str());
        }
      }
      else if ((dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
                dataType == CalpontSystemCatalog::TEXT) &&
               colDefPtr->fType->fLength <= 7)
      {
        ostringstream os;
        os << "varbinary and blob length may not be less than 8";
        throw std::runtime_error(os.str());
      }

      unsigned int i = 0;
      column_iterator = columns.begin();

      while (column_iterator != columns.end())
      {
        column = *column_iterator;
        boost::to_lower(column.tableColName.column);

        if (SCHEMA_COL == column.tableColName.column)
        {
          colTuple.data = qualifiedName.fSchema;
          tmpStr.assign(qualifiedName.fSchema);
        }
        else if (TABLENAME_COL == column.tableColName.column)
        {
          colTuple.data = qualifiedName.fName;
          tmpStr.assign(qualifiedName.fName);
        }
        else if (COLNAME_COL == column.tableColName.column)
        {
          boost::to_lower(colDefPtr->fName);
          colTuple.data = colDefPtr->fName;
          tmpStr.assign(colDefPtr->fName);
        }
        else if (OBJECTID_COL == column.tableColName.column)
        {
          if (alterFlag)
            colTuple.data = coloids[col];
          else
            colTuple.data = coloids[col];
        }
        else if (DATATYPE_COL == column.tableColName.column)
        {
          colTuple.data = dataType;
        }
        else if (COLUMNLEN_COL == column.tableColName.column)
        {
          //@Bug 2089 Disallow zero length char and varch column to be created
          if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR ||
              dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
              dataType == CalpontSystemCatalog::TEXT)
          {
            if (colDefPtr->fType->fLength <= 0)
            {
              ostringstream os;
              os << "char, varchar and varbinary length must be greater than zero";
              throw std::runtime_error(os.str());
            }
          }

          colTuple.data = colDefPtr->fType->fLength;
        }
        else if (COLUMNPOS_COL == column.tableColName.column)
        {
          colTuple.data = colpos;
        }
        else if (DEFAULTVAL_COL == column.tableColName.column)
        {
          if (colDefPtr->fDefaultValue && !colDefPtr->fDefaultValue->fNull)
          {
            tmpStr.assign(colDefPtr->fDefaultValue->fValue);
          }
          else
          {
            tmpStr.dropString();
          }
          colTuple.data = tmpStr;
        }
        else if (NULLABLE_COL == column.tableColName.column)
        {
          int nullable = 1;
          ColumnConstraintList& colConstraints = colDefPtr->fConstraints;
          ColumnConstraintList::const_iterator constraint_iter = colConstraints.begin();

          while (constraint_iter != colConstraints.end())
          {
            ColumnConstraintDef* consDefPtr = *constraint_iter;

            if (consDefPtr->fConstraintType == ddlpackage::DDL_NOT_NULL)
            {
              nullable = 0;
              break;
            }

            ++constraint_iter;
          }

          colTuple.data = nullable;
        }
        else if (SCALE_COL == column.tableColName.column)
        {
          colTuple.data = colDefPtr->fType->fScale;
        }
        else if (PRECISION_COL == column.tableColName.column)
        {
          colTuple.data = colDefPtr->fType->fPrecision;
        }
        else if (DICTOID_COL == column.tableColName.column)
        {
          if (hasDict)
          {
            colTuple.data = dictOID.dictOID;
          }
          else
          {
            colTuple.data = column.colType.getNullValueForType();
          }
        }
        else if (LISTOBJID_COL == column.tableColName.column)
        {
          colTuple.data = column.colType.getNullValueForType();
        }
        else if (TREEOBJID_COL == column.tableColName.column)
        {
          colTuple.data = column.colType.getNullValueForType();
        }
        else if (MINVAL_COL == column.tableColName.column)
        {
          tmpStr.dropString();
        }
        else if (MAXVAL_COL == column.tableColName.column)
        {
          tmpStr.dropString();
        }
        else if (COMPRESSIONTYPE_COL == column.tableColName.column)
        {
          colTuple.data = colDefPtr->fType->fCompressiontype;
        }
        else if (AUTOINC_COL == column.tableColName.column)
        {
          // cout << "autoincrement= " << colDefPtr->fType->fAutoincrement << endl;
          colTuple.data = colDefPtr->fType->fAutoincrement;
        }
        else if (NEXTVALUE_COL == column.tableColName.column)
        {
          colTuple.data = colDefPtr->fType->fNextvalue;
        }
        else if (CHARSETNUM_COL == column.tableColName.column)
        {
          colTuple.data = colDefPtr->fType->fCharsetNum;
        }
        else
        {
          colTuple.data = column.colType.getNullValueForType();
        }

        colStruct.dataOid = column.oid;
        colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
        colStruct.tokenFlag = false;
        colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
        colStruct.colDataType = column.colType.colDataType;
        colStruct.fColDbRoot = dbroot;
        dctnryStruct.fColDbRoot = dbroot;

        if (idbdatafile::IDBPolicy::useHdfs())
        {
          colStruct.fCompressionType = 2;
          dctnryStruct.fCompressionType = 2;
        }

        if (colStruct.tokenFlag)  // TODO: XXX: this is copied aplenty. NEED TO REFACTOR.
        {
          dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
          dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
          dctnryStruct.columnOid = column.oid;
        }
        else
        {
          dctnryStruct.dctnryOid = 0;
          dctnryStruct.columnOid = column.oid;
        }

        oids[colStruct.dataOid] = colStruct.dataOid;

        // oidsToFlush.push_back(colStruct.dataOid);
        if (dctnryStruct.dctnryOid > 0)
        {
          oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
          // oidsToFlush.push_back(dctnryStruct.dctnryOid);
        }

        if (colpos == startPos)
        {
          colStructs.push_back(colStruct);
          dctnryStructList.push_back(dctnryStruct);
          cscColTypeList.push_back(column.colType);
        }

        colList[i].push_back(colTuple);
        // colList.push_back(WriteEngine::ColTupleList());
        // colList.back().push_back(colTuple);
        dctColList[i].push_back(tmpStr);
        // dctColList.push_back(WriteEngine::dictStr());
        // dctColList.back().push_back(tmpStr);
        ++i;
        ++column_iterator;
      }

      ++colpos;
      col++;
      ++iter;
    }

    fWEWrapper.setTransId(txnID);
    fWEWrapper.setIsInsert(true);
    fWEWrapper.setBulkFlag(false);
    fWEWrapper.startTransaction(txnID);

    if (0 != colStructs.size())
    {
      for (unsigned int n = 0; n < numCols; n++)
      {
        colValuesList.push_back(colList[n]);
        dctnryValueList.push_back(dctColList[n]);
      }

      // fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
      error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
                                             dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);

        if ((error == 0) && (rc1 == 0))
        {
          rc1 = fWEWrapper.confirmTransaction(txnID);

          if (rc1 == NO_ERROR)
            rc1 = fWEWrapper.endTransaction(txnID, true);
          else
            fWEWrapper.endTransaction(txnID, false);
        }
        else
        {
          fWEWrapper.endTransaction(txnID, false);
        }
      }

      if (error != WriteEngine::NO_ERROR)
      {
        if (error == ERR_BRM_WR_VB_ENTRY)
        {
          throw std::runtime_error("writeSysColumnMetaData WE: Error writing to BRM.");
        }
        else
        {
          WErrorCodes ec;
          throw std::runtime_error("WE: Error updating calpontsys.syscolumn. " + ec.errorString(error));
        }
      }
      else
        error = rc1;
    }
  }
  catch (exception& ex)
  {
    err += ex.what();
    rc = 1;
  }
  catch (...)
  {
    err += "Unknown exception caught";
    rc = 1;
  }

  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);

  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  return rc;
}

uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32, coloid, dictoid;
  int txnID, startPos;
  string schema, tablename;
  uint8_t tmp8;
  bool isAlter = false;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tablename;
  bs >> coloid;
  bs >> dictoid;
  bs >> tmp8;  // alterFlag
  bs >> tmp32;
  startPos = tmp32;
  isAlter = (tmp8 != 0);
  boost::scoped_ptr<ddlpackage::ColumnDef> colDefPtr(new ddlpackage::ColumnDef());
  colDefPtr->unserialize(bs);

  WriteEngine::ColStruct colStruct;
  WriteEngine::ColTuple colTuple;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  WriteEngine::ColTupleList colTuples;
  WriteEngine::DctColTupleList dctColTuples;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::RIDList ridList;
  WriteEngine::DctnryStruct dctnryStruct;
  WriteEngine::dictStr dctnryTuple;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DictStrList dctnryValueList;

  CalpontSystemCatalog::TableName tableName;

  ColumnList columns;
  ColumnList::const_iterator column_iterator;

  DDLColumn column;
  int error = 0;

  tableName.schema = CALPONT_SCHEMA;
  tableName.table = SYSCOLUMN_TABLE;

  getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);
  unsigned int numCols = columns.size();
  // WriteEngine::ColTupleList colList[numCols];
  // ColTupleList is NOT POD, so let's try this:
  std::vector<WriteEngine::ColTupleList> colList;
  // WriteEngine::dictStr dctColList[numCols];
  std::vector<WriteEngine::dictStr> dctColList;
  std::map<uint32_t, uint32_t> oids;
  std::vector<BRM::OID_t> oidsToFlush;
  // colpos = 0;

  for (unsigned int ii = 0; ii < numCols; ii++)
  {
    colList.push_back(WriteEngine::ColTupleList());
    dctColList.push_back(WriteEngine::dictStr());
  }

  try
  {
    DictOID dictOID = {0, 0, 0, 0, 0};

    int dataType = convertDataType(colDefPtr->fType->fType);

    if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL)
    {
      if (colDefPtr->fType->fPrecision > 38)  //@Bug 5717 precision cannot be over 38.
      {
        ostringstream os;
        os << "Syntax error: The maximum precision (total number of digits) that can be specified is 38";
        throw std::runtime_error(os.str());
      }
      else if (colDefPtr->fType->fPrecision < colDefPtr->fType->fScale)
      {
        ostringstream os;
        os << "Syntax error: scale should be less than precision, precision: " << colDefPtr->fType->fPrecision
           << " scale: " << colDefPtr->fType->fScale;
        throw std::runtime_error(os.str());
      }

      colDefPtr->convertDecimal();
    }

    if (dictoid > 0)
    {
      dictOID.compressionType = colDefPtr->fType->fCompressiontype;
      dictOID.colWidth = colDefPtr->fType->fLength;
      dictOID.dictOID = dictoid;

      //@Bug 2534. Take away the limit of 255 and set the limit to 8000.
      if ((colDefPtr->fType->fLength > 8000) && (dataType != CalpontSystemCatalog::BLOB) &&
          (dataType != CalpontSystemCatalog::TEXT))
      {
        ostringstream os;
        os << "char, varchar and varbinary length may not exceed 8000 bytes";
        throw std::runtime_error(os.str());
      }
    }
    else if ((dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
              dataType == CalpontSystemCatalog::TEXT) &&
             colDefPtr->fType->fLength <= 7)
    {
      ostringstream os;
      os << "varbinary and blob length may not be less than 8";
      throw std::runtime_error(os.str());
    }

    unsigned int i = 0;
    uint16_t dbRoot;
    BRM::OID_t sysOid = 1021;
    // Find out where syscolumn is
    rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
    column_iterator = columns.begin();

    while (column_iterator != columns.end())
    {
      NullString tmpStr;
      column = *column_iterator;
      boost::to_lower(column.tableColName.column);

      if (SCHEMA_COL == column.tableColName.column)
      {
        colTuple.data = schema;
        tmpStr.assign(schema);
      }
      else if (TABLENAME_COL == column.tableColName.column)
      {
        colTuple.data = tablename;
        tmpStr.assign(tablename);
      }
      else if (COLNAME_COL == column.tableColName.column)
      {
        boost::to_lower(colDefPtr->fName);
        colTuple.data = colDefPtr->fName;
        tmpStr.assign(colDefPtr->fName);
      }
      else if (OBJECTID_COL == column.tableColName.column)
      {
        colTuple.data = coloid;
      }
      else if (DATATYPE_COL == column.tableColName.column)
      {
        colTuple.data = dataType;
      }
      else if (COLUMNLEN_COL == column.tableColName.column)
      {
        //@Bug 2089 Disallow zero length char and varch column to be created
        if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR ||
            dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
            dataType == CalpontSystemCatalog::TEXT)
        {
          if (colDefPtr->fType->fLength <= 0)
          {
            ostringstream os;
            os << "char, varchar and varbinary length must be greater than zero";
            throw std::runtime_error(os.str());
          }
        }

        colTuple.data = colDefPtr->fType->fLength;
      }
      else if (COLUMNPOS_COL == column.tableColName.column)
      {
        colTuple.data = startPos;
      }
      else if (DEFAULTVAL_COL == column.tableColName.column)
      {
        if (colDefPtr->fDefaultValue && !colDefPtr->fDefaultValue->fNull)
        {
          tmpStr.assign(colDefPtr->fDefaultValue->fValue);
        }
        else
        {
          tmpStr.dropString();
          // colTuple.data = column.colType.getNullValueForType();
        }
        colTuple.data = tmpStr;
      }
      else if (NULLABLE_COL == column.tableColName.column)
      {
        int nullable = 1;
        ColumnConstraintList& colConstraints = colDefPtr->fConstraints;
        ColumnConstraintList::const_iterator constraint_iter = colConstraints.begin();

        while (constraint_iter != colConstraints.end())
        {
          ColumnConstraintDef* consDefPtr = *constraint_iter;

          if (consDefPtr->fConstraintType == ddlpackage::DDL_NOT_NULL)
          {
            nullable = 0;
            break;
          }

          ++constraint_iter;
        }

        colTuple.data = nullable;
      }
      else if (SCALE_COL == column.tableColName.column)
      {
        colTuple.data = colDefPtr->fType->fScale;
      }
      else if (PRECISION_COL == column.tableColName.column)
      {
        colTuple.data = colDefPtr->fType->fPrecision;
      }
      else if (DICTOID_COL == column.tableColName.column)
      {
        if (dictoid > 0)
        {
          colTuple.data = dictOID.dictOID;
        }
        else
        {
          colTuple.data = column.colType.getNullValueForType();
        }
      }
      else if (LISTOBJID_COL == column.tableColName.column)
      {
        colTuple.data = column.colType.getNullValueForType();
      }
      else if (TREEOBJID_COL == column.tableColName.column)
      {
        colTuple.data = column.colType.getNullValueForType();
      }
      else if (MINVAL_COL == column.tableColName.column)
      {
        tmpStr.dropString();
      }
      else if (MAXVAL_COL == column.tableColName.column)
      {
        tmpStr.dropString();
      }
      else if (COMPRESSIONTYPE_COL == column.tableColName.column)
      {
        colTuple.data = colDefPtr->fType->fCompressiontype;
      }
      else if (AUTOINC_COL == column.tableColName.column)
      {
        // cout << "autoincrement= " << colDefPtr->fType->fAutoincrement << endl;
        colTuple.data = colDefPtr->fType->fAutoincrement;
      }
      else if (NEXTVALUE_COL == column.tableColName.column)
      {
        colTuple.data = colDefPtr->fType->fNextvalue;
      }
      else if (CHARSETNUM_COL == column.tableColName.column)
      {
        colTuple.data = colDefPtr->fType->fCharsetNum;
      }
      else
      {
        colTuple.data = column.colType.getNullValueForType();
      }

      colStruct.dataOid = column.oid;
      oids[column.oid] = column.oid;
      oidsToFlush.push_back(column.oid);
      colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
      colStruct.tokenFlag = false;
      colStruct.fColDbRoot = dbRoot;
      colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
      colStruct.colDataType = column.colType.colDataType;
      dctnryStruct.fColDbRoot = dbRoot;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        colStruct.fCompressionType = 2;
        dctnryStruct.fCompressionType = 2;
      }

      if (colStruct.tokenFlag)
      {
        dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
        dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
        dctnryStruct.columnOid = column.oid;
      }
      else
      {
        dctnryStruct.dctnryOid = 0;
        dctnryStruct.columnOid = column.oid;
      }

      if (dctnryStruct.dctnryOid > 0)
      {
        oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
        oidsToFlush.push_back(dctnryStruct.dctnryOid);
      }

      colStructs.push_back(colStruct);
      dctnryStructList.push_back(dctnryStruct);
      cscColTypeList.push_back(column.colType);
      colList[i].push_back(colTuple);
      // colList.push_back(WriteEngine::ColTupleList());
      // colList.back().push_back(colTuple);
      dctColList[i].push_back(tmpStr);
      // dctColList.push_back(WriteEngine::dictStr());
      // dctColList.back().push_back(tmpStr);
      ++i;
      ++column_iterator;
    }

    if (0 != colStructs.size())
    {
      // FIXME: Is there a cleaner way to do this? Isn't colValuesList the same as colList after this?
      for (unsigned int n = 0; n < numCols; n++)
      {
        colValuesList.push_back(colList[n]);
        dctnryValueList.push_back(dctColList[n]);
      }

      // fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
      fWEWrapper.setTransId(txnID);
      fWEWrapper.setIsInsert(true);
      fWEWrapper.setBulkFlag(false);
      fWEWrapper.startTransaction(txnID);
      int rc1 = 0;

      error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
                                             dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);

        if ((error == 0) && (rc1 == 0))
        {
          rc1 = fWEWrapper.confirmTransaction(txnID);

          if (rc1 == NO_ERROR)
            rc1 = fWEWrapper.endTransaction(txnID, true);
          else
            fWEWrapper.endTransaction(txnID, false);
        }
        else
        {
          fWEWrapper.endTransaction(txnID, false);
        }
      }

      if (error != WriteEngine::NO_ERROR)
      {
        if (error == ERR_BRM_WR_VB_ENTRY)
        {
          throw std::runtime_error("writeSysColumnMetaData WE: Error writing to BRM.");
        }
        else
        {
          WErrorCodes ec;
          throw std::runtime_error("WE: Error updating calpontsys.syscolumn. " + ec.errorString(error));
        }
      }
      else
        error = rc1;
    }
  }
  catch (exception& ex)
  {
    err += ex.what();
    rc = 1;
  }
  catch (...)
  {
    err += "Unknown exception caught";
    rc = 1;
  }

  purgeFDCache();

  if (isAlter)
  {
    if (idbdatafile::IDBPolicy::useHdfs())
      cacheutils::flushOIDsFromCache(oidsToFlush);
  }

  return rc;
}

uint8_t WE_DDLCommandProc::createtablefiles(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t size, i;
  uint16_t tmp16;
  uint32_t tmp32;
  uint8_t tmp8;
  OID dataOid;
  int colWidth;
  bool tokenFlag;
  int txnID;
  CalpontSystemCatalog::ColDataType colDataType;
  uint16_t colDbRoot;
  int compressionType;
  bs >> tmp32;
  txnID = tmp32;
  bs >> size;
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(true);
  fWEWrapper.setBulkFlag(true);
  std::map<uint32_t, uint32_t> oids;

  for (i = 0; i < size; ++i)
  {
    bs >> tmp32;
    dataOid = tmp32;
    bs >> tmp8;
    colDataType = (CalpontSystemCatalog::ColDataType)tmp8;
    bs >> tmp8;
    tokenFlag = (tmp8 != 0);
    bs >> tmp32;
    colWidth = tmp32;
    bs >> tmp16;
    colDbRoot = tmp16;
    bs >> tmp32;
    compressionType = tmp32;
    oids[dataOid] = dataOid;

    if (tokenFlag)
    {
      rc = fWEWrapper.createDctnry(0, dataOid, colWidth, colDbRoot, 0, 0, compressionType);
    }
    else
    {
      rc = fWEWrapper.createColumn(0, dataOid, colDataType, colWidth, colDbRoot, 0, compressionType);
    }

    if (rc != 0)
      break;
  }

  // cout << "creating column file got error code " << rc << endl;
  if (rc != 0)
  {
    WErrorCodes ec;
    ostringstream oss;
    oss << "WE: Error creating column file for oid " << dataOid << "; " << ec.errorString(rc) << endl;
    err = oss.str();
  }

  // if (idbdatafile::IDBPolicy::useHdfs())
  fWEWrapper.flushDataFiles(rc, txnID, oids);
  purgeFDCache();
  return rc;
}

uint8_t WE_DDLCommandProc::commitVersion(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t tmp32;
  int txnID;

  bs >> tmp32;
  txnID = tmp32;

  rc = fWEWrapper.commit(txnID);

  if (rc != 0)
  {
    WErrorCodes ec;
    ostringstream oss;
    oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl;
    err = oss.str();
  }

  return rc;
}

uint8_t WE_DDLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  ;
  int txnID;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(true);
  fWEWrapper.setBulkFlag(true);
  rc = fWEWrapper.rollbackBlocks(txnID, sessionID);

  if (rc != 0)
  {
    WErrorCodes ec;
    ostringstream oss;
    oss << "WE: Error rolling back files " << txnID << " for session " << sessionID << "; "
        << ec.errorString(rc) << endl;
    err = oss.str();
  }

  std::map<uint32_t, uint32_t> oids;

  if (idbdatafile::IDBPolicy::useHdfs())
    fWEWrapper.flushDataFiles(rc, txnID, oids);

  purgeFDCache();
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  return rc;
}

uint8_t WE_DDLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  int txnID;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;

  rc = fWEWrapper.rollbackVersion(txnID, sessionID);

  if (rc != 0)
  {
    WErrorCodes ec;
    ostringstream oss;
    oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; "
        << ec.errorString(rc) << endl;
    err = oss.str();
  }

  purgeFDCache();
  return rc;
}

uint8_t WE_DDLCommandProc::deleteSyscolumn(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  int txnID;
  string schema, tablename;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tablename;

  ddlpackage::QualifiedName sysCatalogTableName;
  sysCatalogTableName.fSchema = CALPONT_SCHEMA;
  sysCatalogTableName.fName = SYSCOLUMN_TABLE;

  CalpontSystemCatalog::TableName userTableName;
  userTableName.schema = schema;
  userTableName.table = tablename;

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1021;
  // Find out where syscolumn is
  rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
  fWEWrapper.setTransId(txnID);
  fWEWrapper.startTransaction(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);

  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;

  try
  {
    CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName);

    WriteEngine::ColStruct colStruct;
    WriteEngine::ColStructList colStructs;
    WriteEngine::CSCTypesList cscColTypeList;
    std::vector<WriteEngine::ColStructList> colExtentsStruct;
    std::vector<WriteEngine::CSCTypesList> colExtentsColType;
    std::vector<void*> colValuesList;
    WriteEngine::RIDList ridList;
    std::vector<WriteEngine::RIDList> ridLists;
    DDLColumn column;
    CalpontSystemCatalog::RIDList::const_iterator colrid_iterator = colRidList.begin();

    while (colrid_iterator != colRidList.end())
    {
      WriteEngine::RID rid = (*colrid_iterator).rid;
      ridList.push_back(rid);
      ++colrid_iterator;
    }

    ColumnList columns;
    getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);

    ColumnList::const_iterator column_iterator = columns.begin();

    while (column_iterator != columns.end())
    {
      column = *column_iterator;
      colStruct.dataOid = column.oid;
      colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
      colStruct.colDataType = column.colType.colDataType;
      colStruct.fColDbRoot = dbRoot;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        colStruct.fCompressionType = 2;
      }

      oids[colStruct.dataOid] = colStruct.dataOid;
      // oidsToFlush.push_back(colStruct.dataOid);
      colStructs.push_back(colStruct);
      cscColTypeList.push_back(column.colType);

      ++column_iterator;
    }

    colExtentsStruct.push_back(colStructs);
    colExtentsColType.push_back(cscColTypeList);
    ridLists.push_back(ridList);

    if (0 != colStructs.size() && 0 != ridLists[0].size())
    {
      int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
                                       SYSCOLUMN_BASE);

      int rc1 = 0;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);

        if ((error == 0) && (rc1 == 0))
        {
          rc1 = fWEWrapper.confirmTransaction(txnID);

          if (rc1 == NO_ERROR)
            rc1 = fWEWrapper.endTransaction(txnID, true);
          else
            fWEWrapper.endTransaction(txnID, false);
        }
        else
        {
          fWEWrapper.endTransaction(txnID, false);
        }
      }

      if (error == NO_ERROR)
        rc = rc1;
      else
        rc = error;
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
  }
  catch (...)
  {
    err = "Unknown exception caught";
    rc = 1;
  }

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);
  return rc;
}

uint8_t WE_DDLCommandProc::deleteSyscolumnRow(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  ;
  int txnID;
  string schema, tablename, columnname;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tablename;
  bs >> columnname;

  ddlpackage::QualifiedName sysCatalogTableName;
  sysCatalogTableName.fSchema = CALPONT_SCHEMA;
  sysCatalogTableName.fName = SYSCOLUMN_TABLE;

  CalpontSystemCatalog::TableColName tableColName;
  tableColName.schema = schema;
  tableColName.table = tablename;
  tableColName.column = columnname;

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1021;
  // Find out where syscolumn is
  rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.startTransaction(txnID);
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;

  try
  {
    CalpontSystemCatalog::ROPair colRO = systemCatalogPtr->columnRID(tableColName);

    if (colRO.objnum < 0)
    {
      err = "Column not found:" + tableColName.table + "." + tableColName.column;
      throw std::runtime_error(err);
    }

    WriteEngine::ColStruct colStruct;
    WriteEngine::ColStructList colStructs;
    WriteEngine::CSCTypesList cscColTypeList;
    std::vector<WriteEngine::ColStructList> colExtentsStruct;
    std::vector<WriteEngine::CSCTypesList> colExtentsColType;
    std::vector<void*> colValuesList;
    WriteEngine::RIDList ridList;
    std::vector<WriteEngine::RIDList> ridLists;
    DDLColumn column;

    ridList.push_back(colRO.rid);

    ColumnList columns;
    getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);

    ColumnList::const_iterator column_iterator = columns.begin();

    while (column_iterator != columns.end())
    {
      column = *column_iterator;
      colStruct.dataOid = column.oid;
      colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
      colStruct.colDataType = column.colType.colDataType;
      colStruct.fColDbRoot = dbRoot;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        colStruct.fCompressionType = 2;
      }

      oids[colStruct.dataOid] = colStruct.dataOid;
      // oidsToFlush.push_back(colStruct.dataOid);
      colStructs.push_back(colStruct);
      cscColTypeList.push_back(column.colType);

      ++column_iterator;
    }

    colExtentsStruct.push_back(colStructs);
    colExtentsColType.push_back(cscColTypeList);
    ridLists.push_back(ridList);

    if (0 != colStructs.size() && 0 != ridLists[0].size())
    {
      int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
                                       SYSCOLUMN_BASE);

      int rc1 = 0;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);

        if ((error == 0) && (rc1 == 0))
        {
          rc1 = fWEWrapper.confirmTransaction(txnID);

          if (rc1 == NO_ERROR)
            rc1 = fWEWrapper.endTransaction(txnID, true);
          else
            fWEWrapper.endTransaction(txnID, false);
        }
        else
        {
          fWEWrapper.endTransaction(txnID, false);
        }
      }

      if (error == NO_ERROR)
        rc = rc1;
      else
        rc = error;
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
  }
  catch (...)
  {
    err = "Unknown exception caught";
    rc = 1;
  }

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);

  return rc;
}

uint8_t WE_DDLCommandProc::deleteSystable(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  ;
  int txnID;
  string schema, tablename;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tablename;

  WriteEngine::WriteEngineWrapper writeEngine;
  ddlpackage::QualifiedName sysCatalogTableName;
  sysCatalogTableName.fSchema = CALPONT_SCHEMA;
  sysCatalogTableName.fName = SYSTABLE_TABLE;

  CalpontSystemCatalog::TableName userTableName;
  userTableName.schema = schema;
  userTableName.table = tablename;

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  uint16_t dbRoot;
  BRM::OID_t sysOid = 1001;

  // Find out where systcolumn is
  rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
  fWEWrapper.setTransId(txnID);

  fWEWrapper.startTransaction(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;

  try
  {
    CalpontSystemCatalog::ROPair userTableROPair = systemCatalogPtr->tableRID(userTableName);

    if (userTableROPair.rid == std::numeric_limits<WriteEngine::RID>::max())
    {
      err = "RowID is not valid ";
      throw std::runtime_error(err);
    }

    WriteEngine::ColStruct colStruct;
    WriteEngine::ColStructList colStructs;
    WriteEngine::CSCTypesList cscColTypeList;
    std::vector<WriteEngine::ColStructList> colExtentsStruct;
    std::vector<WriteEngine::CSCTypesList> colExtentsColType;
    std::vector<void*> colValuesList;
    WriteEngine::RIDList ridList;
    std::vector<WriteEngine::RIDList> ridLists;
    DDLColumn column;
    ridList.push_back(userTableROPair.rid);

    ColumnList columns;
    getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);

    ColumnList::const_iterator column_iterator = columns.begin();

    while (column_iterator != columns.end())
    {
      column = *column_iterator;
      colStruct.dataOid = column.oid;
      colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
      colStruct.colDataType = column.colType.colDataType;
      colStruct.fColDbRoot = dbRoot;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        colStruct.fCompressionType = 2;
      }

      oids[colStruct.dataOid] = colStruct.dataOid;
      // oidsToFlush.push_back(colStruct.dataOid);
      colStructs.push_back(colStruct);
      cscColTypeList.push_back(column.colType);

      ++column_iterator;
    }

    colExtentsStruct.push_back(colStructs);
    colExtentsColType.push_back(cscColTypeList);
    ridLists.push_back(ridList);

    if (0 != colStructs.size() && 0 != ridLists[0].size())
    {
      int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
                                       SYSCOLUMN_BASE);

      int rc1 = 0;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);

        if ((error == 0) && (rc1 == 0))
        {
          rc1 = fWEWrapper.confirmTransaction(txnID);

          if (rc1 == NO_ERROR)
            rc1 = fWEWrapper.endTransaction(txnID, true);
          else
            fWEWrapper.endTransaction(txnID, false);
        }
        else
        {
          fWEWrapper.endTransaction(txnID, false);
        }
      }

      if (error == NO_ERROR)
        rc = rc1;
      else
        rc = error;
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
  }
  catch (...)
  {
    err = "Unknown exception caught";
    rc = 1;
  }

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);

  return rc;
}

uint8_t WE_DDLCommandProc::deleteSystables(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  ;
  int txnID;
  string schema, tablename;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tablename;

  WriteEngine::WriteEngineWrapper writeEngine;
  ddlpackage::QualifiedName sysCatalogTableName;
  sysCatalogTableName.fSchema = CALPONT_SCHEMA;
  sysCatalogTableName.fName = SYSTABLE_TABLE;

  CalpontSystemCatalog::TableName userTableName;
  userTableName.schema = schema;
  userTableName.table = tablename;

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  WriteEngine::ColStruct colStruct;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  std::vector<WriteEngine::ColStructList> colExtentsStruct;
  std::vector<WriteEngine::CSCTypesList> colExtentsColType;
  std::vector<void*> colValuesList;
  WriteEngine::RIDList ridList;
  std::vector<WriteEngine::RIDList> ridLists;
  DDLColumn column;
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1003;
  // Find out where systable is
  rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.startTransaction(txnID);
  std::map<uint32_t, uint32_t> oids;

  // std::vector<BRM::OID_t>  oidsToFlush;
  try
  {
    CalpontSystemCatalog::ROPair userTableROPair = systemCatalogPtr->tableRID(userTableName);

    if (userTableROPair.rid == std::numeric_limits<WriteEngine::RID>::max())
    {
      err = "RowID is not valid ";
      throw std::runtime_error(err);
    }

    ridList.push_back(userTableROPair.rid);

    ColumnList columns;
    getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);

    ColumnList::const_iterator column_iterator = columns.begin();

    while (column_iterator != columns.end())
    {
      column = *column_iterator;
      colStruct.dataOid = column.oid;
      colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
      colStruct.colDataType = column.colType.colDataType;
      colStruct.fColDbRoot = dbRoot;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        colStruct.fCompressionType = 2;
      }

      oids[colStruct.dataOid] = colStruct.dataOid;
      // oidsToFlush.push_back(colStruct.dataOid);
      colStructs.push_back(colStruct);
      cscColTypeList.push_back(column.colType);

      ++column_iterator;
    }

    colExtentsStruct.push_back(colStructs);
    colExtentsColType.push_back(cscColTypeList);
    ridLists.push_back(ridList);

    {
      int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
                                       SYSCOLUMN_BASE);

      int rc1 = 0;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);

        if ((error == 0) && (rc1 == 0))
        {
          rc1 = fWEWrapper.confirmTransaction(txnID);

          if (rc1 == NO_ERROR)
            rc1 = fWEWrapper.endTransaction(txnID, true);
          else
            fWEWrapper.endTransaction(txnID, false);
        }
        else
        {
          fWEWrapper.endTransaction(txnID, false);
        }
      }

      if (error == NO_ERROR)
        rc = rc1;
      else
        rc = error;
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
  }
  catch (...)
  {
    err = "Unknown exception caught";
    rc = 1;
  }

  if (rc != 0)
    return rc;

  // deleting from SYSCOLUMN
  sysCatalogTableName.fSchema = CALPONT_SCHEMA;
  sysCatalogTableName.fName = SYSCOLUMN_TABLE;
  sysOid = 1021;
  // Find out where syscolumn is
  rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);

  try
  {
    CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName);

    colStructs.clear();
    cscColTypeList.clear();
    colExtentsStruct.clear();
    colExtentsColType.clear();
    colValuesList.clear();
    ridList.clear();
    ridLists.clear();
    oids.clear();
    DDLColumn column;
    CalpontSystemCatalog::RIDList::const_iterator colrid_iterator = colRidList.begin();

    while (colrid_iterator != colRidList.end())
    {
      WriteEngine::RID rid = (*colrid_iterator).rid;
      ridList.push_back(rid);
      ++colrid_iterator;
    }

    ColumnList columns;
    getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);

    ColumnList::const_iterator column_iterator = columns.begin();

    while (column_iterator != columns.end())
    {
      column = *column_iterator;
      colStruct.dataOid = column.oid;
      colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
      colStruct.colDataType = column.colType.colDataType;
      colStruct.fColDbRoot = dbRoot;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        colStruct.fCompressionType = 2;
      }

      colStructs.push_back(colStruct);
      oids[colStruct.dataOid] = colStruct.dataOid;
      cscColTypeList.push_back(column.colType);
      // oidsToFlush.push_back(colStruct.dataOid);
      ++column_iterator;
    }

    colExtentsStruct.push_back(colStructs);
    colExtentsColType.push_back(cscColTypeList);
    ridLists.push_back(ridList);

    if (0 != colStructs.size() && 0 != ridLists[0].size())
    {
      int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
                                       SYSCOLUMN_BASE);

      int rc1 = 0;

      if (idbdatafile::IDBPolicy::useHdfs())
      {
        rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);

        if ((error == 0) && (rc1 == 0))
        {
          rc1 = fWEWrapper.confirmTransaction(txnID);

          if (rc1 == NO_ERROR)
            rc1 = fWEWrapper.endTransaction(txnID, true);
          else
            fWEWrapper.endTransaction(txnID, false);
        }
        else
        {
          fWEWrapper.endTransaction(txnID, false);
        }
      }

      if (error == NO_ERROR)
        rc = rc1;
      else
        rc = error;
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
  }
  catch (...)
  {
    err = "Unknown exception caught";
    rc = 1;
  }

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);
  return rc;
}

uint8_t WE_DDLCommandProc::dropFiles(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t size, i;
  uint32_t tmp32;
  std::vector<int32_t> dataOids;

  bs >> size;

  for (i = 0; i < size; ++i)
  {
    bs >> tmp32;
    dataOids.push_back(tmp32);
  }

  try
  {
    rc = fWEWrapper.dropFiles(0, dataOids);
  }
  catch (...)
  {
    err = "WE: Error removing files ";
    rc = 1;
  }

  purgeFDCache();
  return rc;
}

uint8_t WE_DDLCommandProc::updateSyscolumnAuto(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  std::string schema, tablename;
  int txnID;
  uint8_t tmp8;
  bool autoIncrement = false;

  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tablename;
  bs >> tmp8;
  autoIncrement = true;

  CalpontSystemCatalog::TableName tableName;
  tableName.schema = schema;
  tableName.table = tablename;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::DctColTupleList dctRowList;
  WriteEngine::DctnryTuple dctColList;

  uint16_t dbRoot = 0;
  uint16_t segment;
  uint32_t partition;
  CalpontSystemCatalog::RIDList roList;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  try
  {
    roList = systemCatalogPtr->columnRIDs(tableName);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  // Build colStructs for SYSTABLE
  std::vector<WriteEngine::RID> ridList;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::ColTupleList aColList;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  std::vector<void*> colOldValuesList;
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;

  tableName.schema = CALPONT_SCHEMA;
  tableName.table = SYSCOLUMN_TABLE;
  DDLColumn column;
  WriteEngine::ColTuple colTuple;

  findColumnData(sessionID, tableName, AUTOINC_COL, column);
  WriteEngine::ColStruct colStruct;
  WriteEngine::DctnryStruct dctnryStruct;
  colStruct.dataOid = column.oid;
  colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
  colStruct.tokenFlag = false;
  colStruct.colDataType = column.colType.colDataType;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
  }

  string s1("y"), s2("n");
  boost::any datavalue1 = s1;
  boost::any datavalue2 = s2;

  if (autoIncrement)
    colTuple.data = datavalue1;
  else
    colTuple.data = datavalue2;

  colStruct.colDataType = column.colType.colDataType;

  dctnryStruct.dctnryOid = 0;
  dctnryStruct.columnOid = colStruct.dataOid;

  colStructs.push_back(colStruct);
  oids[colStruct.dataOid] = colStruct.dataOid;
  // oidsToFlush.push_back(colStruct.dataOid);
  dctnryStructList.push_back(dctnryStruct);
  cscColTypeList.push_back(column.colType);

  for (unsigned int i = 0; i < roList.size(); i++)
  {
    aColList.push_back(colTuple);
  }

  colValuesList.push_back(aColList);
  std::vector<WriteEngine::ColStructList> colExtentsStruct;
  std::vector<WriteEngine::CSCTypesList> colExtentsColType;
  std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
  std::vector<extentInfo> extentsinfo;
  extentInfo aExtentinfo;
  CalpontSystemCatalog::OID oid = 1021;
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.setTransId(txnID);

  for (unsigned int i = 0; i < roList.size(); i++)
  {
    convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);

    aExtentinfo.dbRoot = dbRoot;
    aExtentinfo.partition = partition;
    aExtentinfo.segment = segment;

    if (extentsinfo.empty())
      extentsinfo.push_back(aExtentinfo);
    else if (extentsinfo.back() != aExtentinfo)
      extentsinfo.push_back(aExtentinfo);

    ridList.push_back(roList[i].rid);
  }

  std::vector<WriteEngine::RIDList> ridLists;
  ridLists.push_back(ridList);

  // build colExtentsStruct
  for (unsigned i = 0; i < extentsinfo.size(); i++)
  {
    for (unsigned j = 0; j < colStructs.size(); j++)
    {
      colStructs[j].fColPartition = extentsinfo[i].partition;
      colStructs[j].fColSegment = extentsinfo[i].segment;
      colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
      dctnryStructList[j].fColPartition = extentsinfo[i].partition;
      dctnryStructList[j].fColSegment = extentsinfo[i].segment;
      dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
    }

    colExtentsStruct.push_back(colStructs);
    dctnryExtentsStruct.push_back(dctnryStructList);
    colExtentsColType.push_back(cscColTypeList);
  }

  // call the write engine to update the row
  if (idbdatafile::IDBPolicy::useHdfs())
    fWEWrapper.startTransaction(txnID);

  rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
                                  ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);

  if (rc != NO_ERROR)
  {
    // build the logging message
    err = "WE: Update failed on: " + tableName.table;
  }

  int rc1 = 0;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);

    if ((rc == 0) && (rc1 == 0))
    {
      rc1 = fWEWrapper.confirmTransaction(txnID);

      if (rc1 == NO_ERROR)
        rc1 = fWEWrapper.endTransaction(txnID, true);
      else
        fWEWrapper.endTransaction(txnID, false);
    }
    else
    {
      fWEWrapper.endTransaction(txnID, false);
    }
  }

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);

  return rc;
}

uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32;
  std::string schema, tablename;
  int txnID;
  uint8_t tmp8;
  bool autoIncrement = false;

  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tablename;
  bs >> tmp8;
  autoIncrement = true;

  CalpontSystemCatalog::TableName tableName;
  tableName.schema = schema;
  tableName.table = tablename;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::DctColTupleList dctRowList;
  WriteEngine::DctnryTuple dctColList;

  uint16_t dbRoot = 0;
  uint16_t segment;
  uint32_t partition;

  CalpontSystemCatalog::RIDList roList;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  try
  {
    roList = systemCatalogPtr->columnRIDs(tableName);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  // Build colStructs for SYSTABLE
  tableName.schema = CALPONT_SCHEMA;
  tableName.table = SYSCOLUMN_TABLE;
  DDLColumn column;
  WriteEngine::ColStruct colStruct;
  WriteEngine::DctnryStruct dctnryStruct;
  WriteEngine::ColTuple colTuple;
  std::vector<WriteEngine::RID> ridList;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::ColTupleList aColList;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  std::vector<void*> colOldValuesList;
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;

  boost::any datavalue;
  findColumnData(sessionID, tableName, AUTOINC_COL, column);
  colStruct.dataOid = column.oid;
  colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
  colStruct.tokenFlag = false;
  colStruct.colDataType = column.colType.colDataType;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
    dctnryStruct.fCompressionType = 2;
  }

  string ystr("y");
  string nstr("n");

  if (autoIncrement)
    colTuple.data = ystr;
  else
    colTuple.data = nstr;

  colStruct.colDataType = column.colType.colDataType;

  dctnryStruct.dctnryOid = 0;
  dctnryStruct.columnOid = colStruct.dataOid;
  oids[colStruct.dataOid] = colStruct.dataOid;
  // oidsToFlush.push_back(colStruct.dataOid);
  colStructs.push_back(colStruct);
  dctnryStructList.push_back(dctnryStruct);
  cscColTypeList.push_back(column.colType);

  for (unsigned int i = 0; i < roList.size(); i++)
  {
    aColList.push_back(colTuple);
  }

  colValuesList.push_back(aColList);

  // get start dbroot for this PM.
  // int PMNum = Config::getLocalModuleID();
  std::vector<extentInfo> extentsinfo;
  extentInfo aExtentinfo;

  // oam.getDbroots(PMNum);
  // dbRoot will be the first dbroot on this pm. dbrootCnt will be how many dbroots on this PM.
  CalpontSystemCatalog::OID oid = 1021;

  for (unsigned int i = 0; i < roList.size(); i++)
  {
    convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);

    aExtentinfo.dbRoot = dbRoot;
    aExtentinfo.partition = partition;
    aExtentinfo.segment = segment;

    if (extentsinfo.empty())
      extentsinfo.push_back(aExtentinfo);
    else if (extentsinfo.back() != aExtentinfo)
      extentsinfo.push_back(aExtentinfo);

    ridList.push_back(roList[i].rid);
  }

  std::vector<WriteEngine::RIDList> ridLists;
  std::vector<WriteEngine::ColStructList> colExtentsStruct;
  std::vector<WriteEngine::CSCTypesList> colExtentsColType;
  std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
  ridLists.push_back(ridList);

  // build colExtentsStruct
  for (unsigned i = 0; i < extentsinfo.size(); i++)
  {
    for (unsigned j = 0; j < colStructs.size(); j++)
    {
      colStructs[j].fColPartition = extentsinfo[i].partition;
      colStructs[j].fColSegment = extentsinfo[i].segment;
      colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
      dctnryStructList[j].fColPartition = extentsinfo[i].partition;
      dctnryStructList[j].fColSegment = extentsinfo[i].segment;
      dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
    }

    colExtentsStruct.push_back(colStructs);
    dctnryExtentsStruct.push_back(dctnryStructList);
    colExtentsColType.push_back(cscColTypeList);
  }

  // call the write engine to update the row
  fWEWrapper.setTransId(txnID);
  fWEWrapper.startTransaction(txnID);

  rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
                                  ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);

  if (rc != NO_ERROR)
  {
    // build the logging message
    err = "WE: Update failed on: " + tableName.table;
  }

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    fWEWrapper.flushDataFiles(rc, txnID, oids);
    fWEWrapper.confirmTransaction(txnID);

    if (rc == 0)
      fWEWrapper.endTransaction(txnID, true);
    else
      fWEWrapper.endTransaction(txnID, false);
  }

  systemCatalogPtr->flushCache();
  purgeFDCache();
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  return rc;
}

uint8_t WE_DDLCommandProc::updateSystableEntryForSysColumn(int32_t sessionID, uint32_t txnID,
                                                           const DDLColumn& column, const std::string& value,
                                                           const std::string& oldValue,
                                                           execplan::CalpontSystemCatalog::RIDList& roList,
                                                           std::string& err)
{
  int rc = 0;

  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::DctColTupleList dctRowList;
  WriteEngine::DctnryTuple dctColList;
  WriteEngine::ColTuple colTuple;
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;

  uint16_t dbRoot = 0;
  uint16_t segment;
  uint32_t partition;

  std::vector<WriteEngine::RID> ridList;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::ColTupleList aColList;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  std::vector<void*> colOldValuesList;

  WriteEngine::ColStruct colStruct;
  WriteEngine::DctnryStruct dctnryStruct;
  WriteEngine::DctnryTuple dictTuple;
  dictTuple.isNull = false;

  colStruct.dataOid = column.oid;
  colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
  colStruct.tokenFlag = false;

  if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
      (column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
      (column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
      (column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
      (column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
      (column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
      (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
       column.colType.precision > 18))  // token
  {
    colStruct.colWidth = 8;
    colStruct.tokenFlag = true;
  }
  else
  {
    colStruct.colWidth = column.colType.colWidth;
  }

  colStruct.colDataType = column.colType.colDataType;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
    dctnryStruct.fCompressionType = 2;
  }

  if (colStruct.tokenFlag)
  {
    dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
    dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
    dctnryStruct.columnOid = colStruct.dataOid;
  }
  else
  {
    dctnryStruct.dctnryOid = 0;
    dctnryStruct.columnOid = colStruct.dataOid;
  }

  oids[colStruct.dataOid] = colStruct.dataOid;

  // oidsToFlush.push_back(colStruct.dataOid);
  if (dctnryStruct.dctnryOid > 0)
  {
    oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
    // oidsToFlush.push_back(dctnryStruct.dctnryOid);
  }

  colStructs.push_back(colStruct);
  dctnryStructList.push_back(dctnryStruct);
  cscColTypeList.push_back(column.colType);

  for (unsigned int i = 0; i < roList.size(); i++)
  {
    aColList.push_back(colTuple);
  }

  colValuesList.push_back(aColList);

  // It's the same string for each column, so we just need one dictionary struct
  void* dictTuplePtr = static_cast<void*>(&dictTuple);
  memset(dictTuplePtr, 0, sizeof(dictTuple));
  dictTuple.sigValue = (unsigned char*)value.c_str();
  dictTuple.sigSize = value.length();
  dictTuple.isNull = false;
  dctColList = dictTuple;
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);
  CalpontSystemCatalog::OID oid = 1021;
  std::vector<extentInfo> extentsinfo;
  extentInfo aExtentinfo;
  std::vector<WriteEngine::ColStructList> colExtentsStruct;
  std::vector<WriteEngine::CSCTypesList> colExtentsColType;
  std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;

  for (unsigned int i = 0; i < roList.size(); i++)
  {
    convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);

    aExtentinfo.dbRoot = dbRoot;
    aExtentinfo.partition = partition;
    aExtentinfo.segment = segment;

    if (extentsinfo.empty())
      extentsinfo.push_back(aExtentinfo);
    else if (extentsinfo.back() != aExtentinfo)
      extentsinfo.push_back(aExtentinfo);

    ridList.push_back(roList[i].rid);
  }

  std::vector<WriteEngine::RIDList> ridLists;
  ridLists.push_back(ridList);

  // build colExtentsStruct
  for (unsigned i = 0; i < extentsinfo.size(); i++)
  {
    for (unsigned j = 0; j < colStructs.size(); j++)
    {
      colStructs[j].fColPartition = extentsinfo[i].partition;
      colStructs[j].fColSegment = extentsinfo[i].segment;
      colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
      dctnryStructList[j].fColPartition = extentsinfo[i].partition;
      dctnryStructList[j].fColSegment = extentsinfo[i].segment;
      dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
    }

    colExtentsStruct.push_back(colStructs);
    dctnryExtentsStruct.push_back(dctnryStructList);
    colExtentsColType.push_back(cscColTypeList);
  }

  // call the write engine to update the row
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.startTransaction(txnID);

  rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
                                  ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);

  if (rc != NO_ERROR)
  {
    // build the logging message
    err = "WE: Update failed on: " + value;
  }

  int rc1 = 0;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);

    if ((rc == 0) && (rc1 == 0))
    {
      rc1 = fWEWrapper.confirmTransaction(txnID);

      if (rc1 == NO_ERROR)
        rc1 = fWEWrapper.endTransaction(txnID, true);
      else
        fWEWrapper.endTransaction(txnID, false);
    }
    else
    {
      fWEWrapper.endTransaction(txnID, false);
    }
  }

  if (rc == 0)
    rc = rc1;

  return rc;
}

uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, txnID;
  std::string schema, oldTablename, newTablename, newSchema;

  bs >> sessionID;
  bs >> txnID;
  bs >> schema;
  bs >> oldTablename;
  bs >> newTablename;
  bs >> newSchema;

  CalpontSystemCatalog::TableName tableName;
  tableName.schema = schema;
  tableName.table = oldTablename;

  CalpontSystemCatalog::RIDList roList;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  try
  {
    roList = systemCatalogPtr->columnRIDs(tableName);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  // Build colStructs for SYSTABLE
  tableName.schema = CALPONT_SCHEMA;
  tableName.table = SYSCOLUMN_TABLE;
  DDLColumn column;
  findColumnData(sessionID, tableName, TABLENAME_COL, column);
  rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newTablename, oldTablename, roList, err);

  if (newSchema != schema && rc == NO_ERROR)
  {
    findColumnData(sessionID, tableName, SCHEMA_COL, column);
    rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newSchema, schema, roList, err);
  }

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);
  // cout << "rename:syscolumn is updated" << endl;
  return rc;
}

uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t sessionID, tmp32, autoVal;
  std::string schema, tablename;
  int txnID;

  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tablename;
  bs >> autoVal;

  CalpontSystemCatalog::TableName tableName;
  tableName.schema = schema;
  tableName.table = tablename;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::DctColTupleList dctRowList;
  WriteEngine::DctnryTuple dctColList;

  uint16_t dbRoot = 0;
  uint16_t segment;
  uint32_t partition;

  CalpontSystemCatalog::ROPair ropair;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  try
  {
    ropair = systemCatalogPtr->tableRID(tableName);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  if (ropair.objnum < 0)
  {
    err = "No such table: " + tableName.table;
    rc = 1;
    return rc;
  }

  // now we have to prepare the various structures for the WE to update the column.

  std::vector<WriteEngine::RID> ridList;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::ColTupleList aColList;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  std::vector<void*> colOldValuesList;
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;
  boost::any datavalue;
  datavalue = autoVal;

  WriteEngine::ColTuple colTuple;

  // Build colStructs for SYSTABLE
  tableName.schema = CALPONT_SCHEMA;
  tableName.table = SYSTABLE_TABLE;
  DDLColumn column;
  findColumnData(sessionID, tableName, AUTOINC_COL, column);
  WriteEngine::ColStruct colStruct;
  WriteEngine::DctnryStruct dctnryStruct;
  colStruct.dataOid = column.oid;
  colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
  colStruct.tokenFlag = false;

  colStruct.colDataType = column.colType.colDataType;

  colTuple.data = datavalue;

  dctnryStruct.dctnryOid = 0;
  dctnryStruct.columnOid = colStruct.dataOid;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
    dctnryStruct.fCompressionType = 2;
  }

  colStructs.push_back(colStruct);
  cscColTypeList.push_back(column.colType);
  oids[colStruct.dataOid] = colStruct.dataOid;
  // oidsToFlush.push_back(colStruct.dataOid);
  dctnryStructList.push_back(dctnryStruct);
  aColList.push_back(colTuple);
  colValuesList.push_back(aColList);
  std::vector<WriteEngine::ColStructList> colExtentsStruct;
  std::vector<WriteEngine::CSCTypesList> colExtentsColType;
  std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;

  WriteEngine::DctnryTuple dctnryTuple;
  dctColList = dctnryTuple;
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);

  // In this case, there's only 1 row, so only one one extent, but keep it generic...
  std::vector<extentInfo> extentsinfo;
  extentInfo aExtentinfo;
  CalpontSystemCatalog::OID oid = 1003;
  convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);

  ridList.push_back(ropair.rid);
  std::vector<WriteEngine::RIDList> ridLists;
  ridLists.push_back(ridList);
  aExtentinfo.dbRoot = dbRoot;
  aExtentinfo.partition = partition;
  aExtentinfo.segment = segment;

  extentsinfo.push_back(aExtentinfo);

  // build colExtentsStruct
  for (unsigned i = 0; i < extentsinfo.size(); i++)
  {
    for (unsigned j = 0; j < colStructs.size(); j++)
    {
      colStructs[j].fColPartition = extentsinfo[i].partition;
      colStructs[j].fColSegment = extentsinfo[i].segment;
      colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
      dctnryStructList[j].fColPartition = extentsinfo[i].partition;
      dctnryStructList[j].fColSegment = extentsinfo[i].segment;
      dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
    }

    colExtentsStruct.push_back(colStructs);
    colExtentsColType.push_back(cscColTypeList);
    dctnryExtentsStruct.push_back(dctnryStructList);
  }

  // call the write engine to update the row
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.startTransaction(txnID);

  rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
                                  ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);

  if (rc != NO_ERROR)
  {
    // build the logging message
    err = "WE: Update failed on: " + tableName.table;
  }

  int rc1 = 0;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);

    if ((rc == 0) && (rc1 == 0))
    {
      rc1 = fWEWrapper.confirmTransaction(txnID);

      if (rc1 == NO_ERROR)
        rc1 = fWEWrapper.endTransaction(txnID, true);
      else
        fWEWrapper.endTransaction(txnID, false);
    }
    else
    {
      fWEWrapper.endTransaction(txnID, false);
    }
  }

  if (rc == 0)
    rc = rc1;

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //		cacheutils::flushOIDsFromCache(oidsToFlush);
  return rc;
}

uint8_t WE_DDLCommandProc::updateSystableEntryForSysTable(int32_t sessionID, uint32_t txnID,
                                                          const DDLColumn& column, const std::string& value,
                                                          const std::string& oldValue,
                                                          CalpontSystemCatalog::ROPair ropair,
                                                          std::string& err)
{
  std::vector<WriteEngine::RID> ridList;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::ColTupleList aColList;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  std::vector<void*> colOldValuesList;
  std::map<uint32_t, uint32_t> oids;

  WriteEngine::ColTuple colTuple;
  WriteEngine::ColStruct colStruct;
  WriteEngine::DctnryStruct dctnryStruct;

  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::DctColTupleList dctRowList;
  WriteEngine::DctnryTuple dctColList;

  colStruct.dataOid = column.oid;
  colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
  colStruct.tokenFlag = true;

  colStruct.colDataType = column.colType.colDataType;
  // Tokenize the data value
  WriteEngine::DctnryStruct dictStruct;
  dictStruct.dctnryOid = column.colType.ddn.dictOID;
  dictStruct.columnOid = column.colType.columnOID;
  WriteEngine::DctnryTuple dictTuple;
  dictTuple.isNull = false;
  dictTuple.sigValue = (unsigned char*)value.c_str();
  dictTuple.sigSize = value.length();

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
    dctnryStruct.fCompressionType = 2;
  }

  if (colStruct.tokenFlag)
  {
    dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
    dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
    dctnryStruct.columnOid = colStruct.dataOid;
  }
  else
  {
    dctnryStruct.dctnryOid = 0;
    dctnryStruct.columnOid = colStruct.dataOid;
  }

  colStructs.push_back(colStruct);
  dctnryStructList.push_back(dctnryStruct);
  oids[colStruct.dataOid] = colStruct.dataOid;
  cscColTypeList.push_back(column.colType);

  // oidsToFlush.push_back(colStruct.dataOid);
  if (dctnryStruct.dctnryOid > 0)
  {
    oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
    // oidsToFlush.push_back(dctnryStruct.dctnryOid);
  }

  aColList.push_back(colTuple);
  colValuesList.push_back(aColList);
  std::vector<WriteEngine::ColStructList> colExtentsStruct;
  std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
  std::vector<WriteEngine::CSCTypesList> colExtentsColType;

  dctColList = dictTuple;
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);

  // In this case, there's only 1 row, so only one one extent, but keep it generic...
  std::vector<extentInfo> extentsinfo;
  extentInfo aExtentinfo;
  CalpontSystemCatalog::OID oid = 1003;

  uint16_t dbRoot = 0;
  uint16_t segment;
  uint32_t partition;

  convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);

  ridList.push_back(ropair.rid);
  std::vector<WriteEngine::RIDList> ridLists;
  ridLists.push_back(ridList);
  aExtentinfo.dbRoot = dbRoot;
  aExtentinfo.partition = partition;
  aExtentinfo.segment = segment;

  extentsinfo.push_back(aExtentinfo);

  // build colExtentsStruct
  for (unsigned i = 0; i < extentsinfo.size(); i++)
  {
    for (unsigned j = 0; j < colStructs.size(); j++)
    {
      colStructs[j].fColPartition = extentsinfo[i].partition;
      colStructs[j].fColSegment = extentsinfo[i].segment;
      colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
      dctnryStructList[j].fColPartition = extentsinfo[i].partition;
      dctnryStructList[j].fColSegment = extentsinfo[i].segment;
      dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
    }

    colExtentsStruct.push_back(colStructs);
    dctnryExtentsStruct.push_back(dctnryStructList);
    colExtentsColType.push_back(cscColTypeList);
  }

  // call the write engine to update the row
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.startTransaction(txnID);

  int rc =
      fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
                                 ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);

  if (rc != NO_ERROR)
  {
    // build the logging message
    err = "WE: Update failed on: " + oldValue;
    int rc1 = 0;

    if (idbdatafile::IDBPolicy::useHdfs())
    {
      rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);

      if ((rc == 0) && (rc1 == 0))
      {
        rc1 = fWEWrapper.confirmTransaction(txnID);

        if (rc1 == NO_ERROR)
          rc1 = fWEWrapper.endTransaction(txnID, true);
        else
          fWEWrapper.endTransaction(txnID, false);
      }
      else
      {
        fWEWrapper.endTransaction(txnID, false);
      }
    }

    if (rc == 0)
      rc = rc1;

    if (rc != 0)
      return rc;
  }

  int rc1 = 0;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);

    if ((rc == 0) && (rc1 == 0))
    {
      rc1 = fWEWrapper.confirmTransaction(txnID);

      if (rc1 == NO_ERROR)
        rc1 = fWEWrapper.endTransaction(txnID, true);
      else
        fWEWrapper.endTransaction(txnID, false);
    }
    else
    {
      fWEWrapper.endTransaction(txnID, false);
    }
  }

  if (rc == 0)
    rc = rc1;

  return rc;
}

uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err)
{
  uint8_t rc;
  uint32_t sessionID, tmp32, txnID;
  std::string schema, oldTablename, newTablename, newSchema;

  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> oldTablename;
  bs >> newTablename;
  bs >> newSchema;

  CalpontSystemCatalog::TableName tableName;
  tableName.schema = schema;
  tableName.table = oldTablename;

  CalpontSystemCatalog::ROPair ropair;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  try
  {
    ropair = systemCatalogPtr->tableRID(tableName);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  if (ropair.objnum < 0)
  {
    err = "No such table: " + tableName.table;
    return 1;
  }

  // now we have to prepare the various structures for the WE to update the column.

  // Build colStructs for SYSTABLE
  tableName.schema = CALPONT_SCHEMA;
  tableName.table = SYSTABLE_TABLE;
  DDLColumn column;
  findColumnData(sessionID, tableName, TABLENAME_COL, column);

  rc = updateSystableEntryForSysTable(sessionID, txnID, column, newTablename, oldTablename, ropair, err);

  if (newSchema != schema && rc == NO_ERROR)
  {
    findColumnData(sessionID, tableName, SCHEMA_COL, column);
    rc = updateSystableEntryForSysTable(sessionID, txnID, column, newSchema, schema, ropair, err);
  }

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);
  // cout << "rename:syscolumn is updated" << endl;
  return rc;
}

uint8_t WE_DDLCommandProc::updateSyscolumnColumnposCol(messageqcpp::ByteStream& bs, std::string& err)
{
  int rc = 0;
  int colPos;
  string schema, atableName;
  uint32_t sessionID, tmp32;
  int txnID;

  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> atableName;
  bs >> tmp32;
  colPos = tmp32;

  WriteEngine::RIDList ridList;
  WriteEngine::ColValueList colValuesList;
  WriteEngine::ColValueList colOldValuesList;
  CalpontSystemCatalog::TableName tableName;
  tableName.table = atableName;
  tableName.schema = schema;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
      CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  CalpontSystemCatalog::RIDList rids;

  try
  {
    rids = systemCatalogPtr->columnRIDs(tableName);
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  CalpontSystemCatalog::RIDList::const_iterator rid_iter = rids.begin();
  boost::any value;
  WriteEngine::ColTupleList colTuples;
  CalpontSystemCatalog::ColType columnType;
  CalpontSystemCatalog::ROPair colRO;

  // cout << "colpos is " << colPos << endl;
  try
  {
    while (rid_iter != rids.end())
    {
      // look up colType
      colRO = *rid_iter;
      columnType = systemCatalogPtr->colType(colRO.objnum);

      if (columnType.colPosition < colPos)
      {
        ++rid_iter;
        continue;
      }

      ridList.push_back(colRO.rid);
      value = columnType.colPosition - 1;
      WriteEngine::ColTuple colTuple;
      colTuple.data = value;
      colTuples.push_back(colTuple);
      ++rid_iter;
    }
  }
  catch (std::exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }

  colValuesList.push_back(colTuples);
  uint16_t dbRoot;
  BRM::OID_t sysOid = 1021;
  // Find out where systable is
  rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.startTransaction(txnID);
  std::map<uint32_t, uint32_t> oids;

  // std::vector<BRM::OID_t>  oidsToFlush;
  if (colTuples.size() > 0)
  {
    WriteEngine::ColStructList colStructs;
    WriteEngine::ColStruct colStruct;
    WriteEngine::DctnryStructList dctnryStructList;
    WriteEngine::DctnryValueList dctnryValueList;
    WriteEngine::CSCTypesList cscColTypeList;
    CalpontSystemCatalog::ColType colType;
    // Build column structure for COLUMNPOS_COL
    colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS;
    colType.colWidth = colStruct.colWidth = 4;
    colStruct.tokenFlag = false;
    colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::INT;
    colStruct.fColDbRoot = dbRoot;

    if (idbdatafile::IDBPolicy::useHdfs())
    {
      colStruct.fCompressionType = 2;
    }

    colStructs.push_back(colStruct);
    cscColTypeList.push_back(colType);
    oids[colStruct.dataOid] = colStruct.dataOid;
    // oidsToFlush.push_back(colStruct.dataOid);
    rc = fWEWrapper.updateColumnRecs(txnID, cscColTypeList, colStructs, colValuesList, ridList,
                                     SYSCOLUMN_BASE);
  }

  int rc1 = 0;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);

    if ((rc == 0) && (rc1 == 0))
    {
      rc1 = fWEWrapper.confirmTransaction(txnID);

      if (rc1 == NO_ERROR)
        rc1 = fWEWrapper.endTransaction(txnID, true);
      else
        fWEWrapper.endTransaction(txnID, false);
    }
    else
    {
      fWEWrapper.endTransaction(txnID, false);
    }
  }

  if (rc == 0)
    rc = rc1;

  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);
  return rc;
}

uint8_t WE_DDLCommandProc::fillNewColumn(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t tmp32;
  uint8_t tmp8;
  int txnID;
  OID dataOid, dictOid, refColOID;
  CalpontSystemCatalog::ColDataType dataType, refColDataType;
  bool autoincrement;
  int dataWidth, scale, precision, compressionType, refColWidth, refCompressionType;
  string defaultValStr;
  ColTuple defaultVal;
  long timeZone;

  bs >> tmp32;
  txnID = tmp32;
  bs >> tmp32;
  dataOid = tmp32;
  bs >> tmp32;
  dictOid = tmp32;
  bs >> tmp8;
  dataType = (CalpontSystemCatalog::ColDataType)tmp8;
  bs >> tmp8;
  autoincrement = (tmp8 != 0);
  bs >> tmp32;
  dataWidth = tmp32;
  bs >> tmp32;
  scale = tmp32;
  bs >> tmp32;
  precision = tmp32;
  bs >> defaultValStr;
  bs >> tmp8;
  compressionType = tmp8;
  bs >> tmp32;
  refColOID = tmp32;
  bs >> tmp8;
  refColDataType = (CalpontSystemCatalog::ColDataType)tmp8;
  bs >> tmp32;
  refColWidth = tmp32;
  bs >> tmp8;
  refCompressionType = tmp8;
  messageqcpp::ByteStream::octbyte timeZoneTemp;
  bs >> timeZoneTemp;
  timeZone = timeZoneTemp;
  // Find the fill in value
  bool isNULL = false;

  if (defaultValStr == "")
    isNULL = true;

  CalpontSystemCatalog::ColType colType;
  colType.colDataType = static_cast<CalpontSystemCatalog::ColDataType>(dataType);
  colType.colWidth = dataWidth;
  colType.scale = scale;
  colType.precision = precision;
  bool pushWarning = false;
  defaultVal.data = colType.convertColumnData(defaultValStr, pushWarning, timeZone, isNULL, false, false);
  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(true);
  fWEWrapper.setBulkFlag(true);
  std::map<uint32_t, uint32_t> oids;
  oids[dataOid] = dataOid;
  oids[refColOID] = refColOID;
  rc = fWEWrapper.fillColumn(txnID, dataOid, colType, defaultVal, refColOID, refColDataType, refColWidth,
                             refCompressionType, isNULL, compressionType, defaultValStr, dictOid,
                             autoincrement);

  if (rc != 0)
  {
    WErrorCodes ec;
    err = ec.errorString(rc);
  }

  purgeFDCache();
  return rc;
}

uint8_t WE_DDLCommandProc::writeTruncateLog(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t tableOid, numOid, tmp32;
  bs >> tableOid;
  bs >> numOid;
  std::vector<uint32_t> oids;

  for (uint32_t i = 0; i < numOid; i++)
  {
    bs >> tmp32;
    oids.push_back(tmp32);
  }

  string prefix;
  config::Config* config = config::Config::makeConfig();
  prefix = config->getConfig("SystemConfig", "DBRMRoot");

  if (prefix.length() == 0)
  {
    err = "Need a valid DBRMRoot entry in Calpont configuation file";
    rc = 1;
    return rc;
  }

  uint64_t pos = prefix.find_last_of("/");
  std::string DDLLogFileName;

  if (pos != string::npos)
  {
    DDLLogFileName = prefix.substr(0, pos + 1);  // Get the file path
  }
  else
  {
    err = "Cannot find the dbrm directory for the DDL log file";
    rc = 1;
    return rc;
  }

  std::ostringstream oss;
  oss << tableOid;
  DDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str();
  boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
      IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));

  if (!DDLLogFile)
  {
    err = "DDL truncate table log file cannot be created";
    rc = 1;
    return rc;
  }

  std::ostringstream buf;

  for (unsigned i = 0; i < oids.size(); i++)
    buf << oids[i] << std::endl;

  std::string tmp(buf.str());
  DDLLogFile->write(tmp.c_str(), tmp.size());

  // DDLLogFile is a scoped_ptr, will be closed after return.

  return rc;
}

uint8_t WE_DDLCommandProc::writeDropPartitionLog(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t tableOid, numParts, numOid, tmp32;
  bs >> tableOid;
  std::set<BRM::LogicalPartition> partitionNums;
  bs >> numParts;
  BRM::LogicalPartition lp;

  for (uint32_t i = 0; i < numParts; i++)
  {
    lp.unserialize(bs);
    partitionNums.insert(lp);
  }

  bs >> numOid;
  std::vector<uint32_t> oids;

  for (uint32_t i = 0; i < numOid; i++)
  {
    bs >> tmp32;
    oids.push_back(tmp32);
  }

  string prefix;
  config::Config* config = config::Config::makeConfig();
  prefix = config->getConfig("SystemConfig", "DBRMRoot");

  if (prefix.length() == 0)
  {
    err = "Need a valid DBRMRoot entry in Calpont configuation file";
    rc = 1;
    return rc;
  }

  uint64_t pos = prefix.find_last_of("/");
  std::string DDLLogFileName;

  if (pos != string::npos)
  {
    DDLLogFileName = prefix.substr(0, pos + 1);  // Get the file path
  }
  else
  {
    err = "Cannot find the dbrm directory for the DDL drop partitions log file";
    rc = 1;
    return rc;
  }

  std::ostringstream oss;
  oss << tableOid;
  DDLLogFileName += "DDL_DROPPARTITION_Log_" + oss.str();
  boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
      IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));

  if (!DDLLogFile)
  {
    err = "DDL drop partitions log file cannot be created";
    rc = 1;
    return rc;
  }

  std::ostringstream buf;
  // @SN write partition numbers to the log file, separated by space
  set<BRM::LogicalPartition>::const_iterator it;

  for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
    buf << (*it) << endl;

  // -1 indicates the end of partition list
  BRM::LogicalPartition end(-1, -1, -1);
  buf << end << endl;

  for (unsigned i = 0; i < oids.size(); i++)
    buf << oids[i] << std::endl;

  std::string tmp(buf.str());
  DDLLogFile->write(tmp.c_str(), tmp.size());

  return rc;
}

uint8_t WE_DDLCommandProc::writeDropTableLog(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t tableOid, numOid, tmp32;
  bs >> tableOid;

  bs >> numOid;
  std::vector<uint32_t> oids;

  for (uint32_t i = 0; i < numOid; i++)
  {
    bs >> tmp32;
    oids.push_back(tmp32);
  }

  string prefix;
  config::Config* config = config::Config::makeConfig();
  prefix = config->getConfig("SystemConfig", "DBRMRoot");

  if (prefix.length() == 0)
  {
    err = "Need a valid DBRMRoot entry in Calpont configuation file";
    rc = 1;
    return rc;
  }

  uint64_t pos = prefix.find_last_of("/");
  std::string DDLLogFileName;

  if (pos != string::npos)
  {
    DDLLogFileName = prefix.substr(0, pos + 1);  // Get the file path
  }
  else
  {
    err = "Cannot find the dbrm directory for the DDL drop partitions log file";
    rc = 1;
    return rc;
  }

  std::ostringstream oss;
  oss << tableOid;
  DDLLogFileName += "DDL_DROPTABLE_Log_" + oss.str();
  boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
      IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));

  if (!DDLLogFile)
  {
    err = "DDL drop table log file cannot be created";
    rc = 1;
    return rc;
  }

  std::ostringstream buf;

  for (unsigned i = 0; i < oids.size(); i++)
    buf << oids[i] << std::endl;

  std::string tmp(buf.str());
  DDLLogFile->write(tmp.c_str(), tmp.size());

  return rc;
}

uint8_t WE_DDLCommandProc::deleteDDLLog(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t tableOid, fileType;
  bs >> fileType;
  bs >> tableOid;
  string prefix;
  config::Config* config = config::Config::makeConfig();
  prefix = config->getConfig("SystemConfig", "DBRMRoot");

  if (prefix.length() == 0)
  {
    err = "Need a valid DBRMRoot entry in Calpont configuation file";
    rc = 1;
    return rc;
  }

  uint64_t pos = prefix.find_last_of("/");
  std::string DDLLogFileName;

  if (pos != string::npos)
  {
    DDLLogFileName = prefix.substr(0, pos + 1);  // Get the file path
  }
  else
  {
    err = "Cannot find the dbrm directory for the DDL drop partitions log file";
    rc = 1;
    return rc;
  }

  std::ostringstream oss;
  oss << tableOid;

  switch (fileType)
  {
    case DROPTABLE_LOG:
    {
      DDLLogFileName += "DDL_DROPTABLE_Log_" + oss.str();
      break;
    }

    case DROPPART_LOG:
    {
      DDLLogFileName += "DDL_DROPPARTITION_Log_" + oss.str();
      break;
    }

    case TRUNCATE_LOG:
    {
      DDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str();
      break;
    }

    default: break;
  }

  IDBPolicy::remove(DDLLogFileName.c_str());

  return rc;
}

uint8_t WE_DDLCommandProc::fetchDDLLog(ByteStream& bs, std::string& err)
{
  int rc = 0;

  // Find the ddl log files under DBRMRoot directory
  string prefix, ddlLogDir;
  config::Config* config = config::Config::makeConfig();
  prefix = config->getConfig("SystemConfig", "DBRMRoot");

  if (prefix.length() == 0)
  {
    rc = 1;
    err = "Need a valid DBRMRoot entry in Calpont configuation file";
    return rc;
  }

  uint64_t pos = prefix.find_last_of("/");

  if (pos != string::npos)
  {
    ddlLogDir = prefix.substr(0, pos + 1);  // Get the file path
  }
  else
  {
    rc = 1;
    err = "Cannot find the dbrm directory for the DDL log file";
    return rc;
  }

  boost::filesystem::path filePath;
  filePath = fs::system_complete(fs::path(ddlLogDir));

  if (!fs::exists(filePath))
  {
    rc = 1;
    err = "\nDDL log file path is Not found: ";
    return rc;
  }

  std::vector<string> fileNames;

  if (fs::is_directory(filePath))
  {
    fs::directory_iterator end_iter;

    for (fs::directory_iterator dir_itr(filePath); dir_itr != end_iter; ++dir_itr)
    {
      try
      {
        if (!fs::is_directory(*dir_itr))
        {
#if BOOST_VERSION >= 105200
          fileNames.push_back(dir_itr->path().generic_string());
#else
          fileNames.push_back(dir_itr->string());
#endif
        }
      }
      catch (std::exception& ex)
      {
        err = ex.what();
        rc = 1;
        return rc;
      }
    }
  }

  CalpontSystemCatalog::OID fileoid;
  string tableName;
  bs.restart();

  for (unsigned i = 0; i < fileNames.size(); i++)
  {
    pos = fileNames[i].find("DDL_DROPTABLE_Log_");

    if (pos != string::npos)
    {
      // Read the file to get oids
      // cout << "Found file " << fileNames[i] << endl;
      boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
          IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));

      if (!ddlLogFile)
        continue;

      // find the table oid
      pos = fileNames[i].find_last_of("_");
      string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
      char* ep = NULL;
      uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
      bs << tableOid;
      bs << (uint32_t)DROPTABLE_LOG;
      std::vector<CalpontSystemCatalog::OID> oidList;

      ssize_t fileSize = ddlLogFile->size();
      boost::scoped_array<char> buf(new char[fileSize]);

      if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
        return (uint8_t)ERR_FILE_READ;

      std::istringstream strbuf(string(buf.get(), fileSize));

      while (strbuf >> fileoid)
        oidList.push_back(fileoid);

      bs << (uint32_t)oidList.size();

      for (unsigned j = 0; j < oidList.size(); j++)
      {
        bs << (uint32_t)oidList[j];
      }

      bs << (uint32_t)0;
    }
    else  // Find drop partition log file
    {
      pos = fileNames[i].find("DDL_DROPPARTITION_Log_");

      if (pos != string::npos)
      {
        boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
            IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));
        BRM::LogicalPartition partition;
        vector<BRM::LogicalPartition> partitionNums;
        // find the table oid
        pos = fileNames[i].find_last_of("_");
        string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
        char* ep = NULL;
        uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
        bs << tableOid;
        bs << (uint32_t)DROPPART_LOG;

        ssize_t fileSize = ddlLogFile->size();
        boost::scoped_array<char> buf(new char[fileSize]);

        if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
          return (uint8_t)ERR_FILE_READ;

        std::istringstream strbuf(string(buf.get(), fileSize));

        while (strbuf >> partition)
        {
          if (partition.dbroot == (uint16_t)-1)
            break;

          partitionNums.push_back(partition);
        }

        std::vector<CalpontSystemCatalog::OID> oidPartList;

        while (strbuf >> fileoid)
          oidPartList.push_back(fileoid);

        bs << (uint32_t)oidPartList.size();

        for (unsigned j = 0; j < oidPartList.size(); j++)
        {
          bs << (uint32_t)oidPartList[j];
        }

        bs << (uint32_t)partitionNums.size();

        for (unsigned j = 0; j < partitionNums.size(); j++)
        {
          partitionNums[j].serialize(bs);
        }
      }
      else  // find truncate table log file
      {
        pos = fileNames[i].find("DDL_TRUNCATETABLE_Log_");

        if (pos != string::npos)
        {
          boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
              IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));

          if (!ddlLogFile)
          {
            continue;
          }

          // find the table oid
          pos = fileNames[i].find_last_of("_");
          string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
          char* ep = NULL;
          uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
          bs << tableOid;
          bs << (uint32_t)TRUNCATE_LOG;
          std::vector<CalpontSystemCatalog::OID> oidList;

          ssize_t fileSize = ddlLogFile->size();
          boost::scoped_array<char> buf(new char[fileSize]);

          if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
            return (uint8_t)ERR_FILE_READ;

          std::istringstream strbuf(string(buf.get(), fileSize));

          while (strbuf >> fileoid)
            oidList.push_back(fileoid);

          bs << (uint32_t)oidList.size();

          for (unsigned j = 0; j < oidList.size(); j++)
          {
            bs << (uint32_t)oidList[j];
          }

          bs << (uint32_t)0;
        }
      }
    }
  }

  return rc;
}

uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs, std::string& err)
{
  // Will update five columns: columnname, defaultvalue, nullable, autoincrement, nextvalue.
  int rc = 0;
  uint32_t tmp32;
  string schema, tableName, colName, defaultvalue;
  int txnID;
  uint32_t sessionID;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tableName;
  bs >> colName;
  bs >> defaultvalue;

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  CalpontSystemCatalog::TableName atableName;
  CalpontSystemCatalog::TableColName tableColName;
  tableColName.schema = schema;
  tableColName.table = tableName;
  tableColName.column = colName;
  CalpontSystemCatalog::ROPair ropair;

  try
  {
    ropair = systemCatalogPtr->columnRID(tableColName);

    if (ropair.objnum < 0)
    {
      ostringstream oss;
      oss << "No such column: " << tableColName;
      throw std::runtime_error(oss.str().c_str());
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }
  catch (...)
  {
    err = "renameColumn:Unknown exception caught";
    rc = 1;
    return rc;
  }

  uint16_t dbRoot = 0;
  uint16_t segment;
  uint32_t partition;

  std::vector<WriteEngine::RID> ridList;
  ridList.push_back(ropair.rid);
  WriteEngine::ColValueList colValuesList;
  WriteEngine::ColTupleList aColList1;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  std::vector<void*> colOldValuesList;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::DctColTupleList dctRowList;
  WriteEngine::DctnryTuple dctColList;
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;

  WriteEngine::ColTuple colTuple;

  // Build colStructs for SYSCOLUMN
  atableName.schema = CALPONT_SCHEMA;
  atableName.table = SYSCOLUMN_TABLE;
  DDLColumn column;
  findColumnData(sessionID, atableName, DEFAULTVAL_COL, column);  // DEFAULTVAL_COL column

  WriteEngine::ColStruct colStruct;
  WriteEngine::DctnryStruct dctnryStruct;

  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.setTransId(txnID);
  fWEWrapper.startTransaction(txnID);
  // Build DEFAULTVAL_COL structure
  WriteEngine::ColTupleList aColList;
  colStruct.dataOid = column.oid;
  colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
  colStruct.tokenFlag = false;

  if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
      (column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
      (column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
      (column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
      (column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
      (column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
      (column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
       column.colType.precision > 18))  // token
  {
    colStruct.colWidth = 8;
    colStruct.tokenFlag = true;
  }
  else
  {
    colStruct.colWidth = column.colType.colWidth;
  }

  colStruct.colDataType = column.colType.colDataType;

  if (colStruct.tokenFlag)
  {
    WriteEngine::DctnryStruct dictStruct;
    dictStruct.dctnryOid = column.colType.ddn.dictOID;
    dictStruct.columnOid = column.colType.columnOID;

    if (defaultvalue.length() <= 0)  // null token
    {
      WriteEngine::Token nullToken;
      colTuple.data = nullToken;
    }
    else
    {
      WriteEngine::DctnryTuple dictTuple;
      dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
      dictTuple.sigSize = defaultvalue.length();
      dictTuple.isNull = false;
      int error = NO_ERROR;

      if (NO_ERROR !=
          (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false)))  // @bug 5572 HDFS tmp file
      {
        WErrorCodes ec;
        throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
      }

      WriteEngine::Token aToken = dictTuple.token;
      colTuple.data = aToken;
    }
  }

  colStruct.colDataType = column.colType.colDataType;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
    dctnryStruct.fCompressionType = 2;
  }

  if (colStruct.tokenFlag)
  {
    dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
    dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
    dctnryStruct.columnOid = colStruct.dataOid;
  }
  else
  {
    dctnryStruct.dctnryOid = 0;
    dctnryStruct.columnOid = colStruct.dataOid;
  }

  colStructs.push_back(colStruct);
  oids[colStruct.dataOid] = colStruct.dataOid;
  cscColTypeList.push_back(column.colType);

  // oidsToFlush.push_back(colStruct.dataOid);
  if (dctnryStruct.dctnryOid > 0)
  {
    oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
    // oidsToFlush.push_back(dctnryStruct.dctnryOid);
  }

  dctnryStructList.push_back(dctnryStruct);
  aColList.push_back(colTuple);
  colValuesList.push_back(aColList);
  WriteEngine::DctnryTuple dctnryTuple;

  if (defaultvalue.length() > 0)
  {
    dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
    dctnryTuple.sigSize = defaultvalue.length();
    dctnryTuple.isNull = false;
  }
  else
  {
    dctnryTuple.isNull = true;
  }

  dctColList = dctnryTuple;
  dctRowList.clear();
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);

  std::vector<WriteEngine::ColStructList> colExtentsStruct;
  std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
  std::vector<WriteEngine::CSCTypesList> colExtentsColType;
  std::vector<WriteEngine::RIDList> ridLists;
  ridLists.push_back(ridList);

  // In this case, there's only 1 row, so only one one extent, but keep it generic...
  std::vector<extentInfo> extentsinfo;
  extentInfo aExtentinfo;

  convertRidToColumn(ropair.rid, dbRoot, partition, segment, 1021);

  aExtentinfo.dbRoot = dbRoot;
  aExtentinfo.partition = partition;
  aExtentinfo.segment = segment;

  extentsinfo.push_back(aExtentinfo);

  // build colExtentsStruct
  for (unsigned i = 0; i < extentsinfo.size(); i++)
  {
    for (unsigned j = 0; j < colStructs.size(); j++)
    {
      colStructs[j].fColPartition = extentsinfo[i].partition;
      colStructs[j].fColSegment = extentsinfo[i].segment;
      colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
      dctnryStructList[j].fColPartition = extentsinfo[i].partition;
      dctnryStructList[j].fColSegment = extentsinfo[i].segment;
      dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
    }

    colExtentsStruct.push_back(colStructs);
    dctnryExtentsStruct.push_back(dctnryStructList);
    colExtentsColType.push_back(cscColTypeList);
  }

  // call the write engine to update the row

  if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList,
                                             colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList,
                                             SYSCOLUMN_BASE))
  {
    err = "WE: Update failed on: " + atableName.table;
    rc = 1;
  }

  int rc1 = 0;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);

    if ((rc == 0) && (rc1 == 0))
    {
      rc1 = fWEWrapper.confirmTransaction(txnID);

      if (rc1 == NO_ERROR)
        rc1 = fWEWrapper.endTransaction(txnID, true);
      else
        fWEWrapper.endTransaction(txnID, false);
    }
    else
    {
      fWEWrapper.endTransaction(txnID, false);
    }
  }

  if (rc == 0)
    rc = rc1;

  // flush syscat cahche
  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //	cacheutils::flushOIDsFromCache(oidsToFlush);

  return rc;
}

uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& bs, std::string& err)
{
  // Will update five columns: columnname, defaultvalue, nullable, autoincrement, nextvalue.
  int rc = 0;
  uint64_t nextVal;
  uint32_t tmp32, nullable;
  string schema, tableName, colOldname, autoinc, colNewName, defaultvalue;
  int txnID;
  uint32_t sessionID;
  bs >> sessionID;
  bs >> tmp32;
  txnID = tmp32;
  bs >> schema;
  bs >> tableName;
  bs >> colOldname;
  bs >> colNewName;
  bs >> autoinc;
  bs >> nextVal;
  bs >> nullable;
  bs >> defaultvalue;

  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);

  CalpontSystemCatalog::TableName atableName;
  CalpontSystemCatalog::TableColName tableColName;
  tableColName.schema = schema;
  tableColName.table = tableName;
  tableColName.column = colOldname;
  CalpontSystemCatalog::ROPair ropair;

  try
  {
    ropair = systemCatalogPtr->columnRID(tableColName);

    if (ropair.objnum < 0)
    {
      ostringstream oss;
      oss << "No such column: " << tableColName;
      throw std::runtime_error(oss.str().c_str());
    }
  }
  catch (exception& ex)
  {
    err = ex.what();
    rc = 1;
    return rc;
  }
  catch (...)
  {
    err = "renameColumn:Unknown exception caught";
    rc = 1;
    return rc;
  }

  uint16_t dbRoot = 0;
  uint16_t segment;
  uint32_t partition;

  std::vector<WriteEngine::RID> ridList;
  ridList.push_back(ropair.rid);
  WriteEngine::ColValueList colValuesList;
  WriteEngine::ColTupleList aColList1;
  WriteEngine::ColStructList colStructs;
  WriteEngine::CSCTypesList cscColTypeList;
  std::vector<void*> colOldValuesList;
  std::map<uint32_t, uint32_t> oids;
  // std::vector<BRM::OID_t>  oidsToFlush;
  WriteEngine::DctnryStructList dctnryStructList;
  WriteEngine::DctnryValueList dctnryValueList;
  WriteEngine::DctColTupleList dctRowList;
  WriteEngine::DctnryTuple dctColList;
  boost::any datavalue;
  datavalue = colNewName;

  WriteEngine::ColTuple colTuple;

  // Build colStructs for SYSCOLUMN
  atableName.schema = CALPONT_SCHEMA;
  atableName.table = SYSCOLUMN_TABLE;
  DDLColumn column1, column2, column3, column4, column5;
  findColumnData(sessionID, atableName, COLNAME_COL, column1);     // COLNAME_COL column
  findColumnData(sessionID, atableName, AUTOINC_COL, column2);     // AUTOINC_COL column
  findColumnData(sessionID, atableName, NEXTVALUE_COL, column3);   // NEXTVALUE_COL column
  findColumnData(sessionID, atableName, NULLABLE_COL, column4);    // NULLABLE_COL column
  findColumnData(sessionID, atableName, DEFAULTVAL_COL, column5);  // DEFAULTVAL_COL column

  WriteEngine::ColStruct colStruct;
  WriteEngine::DctnryStruct dctnryStruct;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
    dctnryStruct.fCompressionType = 2;
  }

  fWEWrapper.setTransId(txnID);
  fWEWrapper.setIsInsert(false);
  fWEWrapper.setBulkFlag(false);
  fWEWrapper.startTransaction(txnID);
  // Build COLNAME_COL structure
  colStruct.dataOid = column1.oid;
  colStruct.colWidth = column1.colType.colWidth > 8 ? 8 : column1.colType.colWidth;
  colStruct.tokenFlag = false;

  if ((column1.colType.colDataType == CalpontSystemCatalog::CHAR && column1.colType.colWidth > 8) ||
      (column1.colType.colDataType == CalpontSystemCatalog::VARCHAR && column1.colType.colWidth > 7) ||
      (column1.colType.colDataType == CalpontSystemCatalog::VARBINARY && column1.colType.colWidth > 7) ||
      (column1.colType.colDataType == CalpontSystemCatalog::BLOB && column1.colType.colWidth > 7) ||
      (column1.colType.colDataType == CalpontSystemCatalog::TEXT && column1.colType.colWidth > 7) ||
      (column1.colType.colDataType == CalpontSystemCatalog::DECIMAL && column1.colType.precision > 18) ||
      (column1.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
       column1.colType.precision > 18))  // token
  {
    colStruct.colWidth = 8;
    colStruct.tokenFlag = true;
  }
  else
  {
    colStruct.colWidth = column1.colType.colWidth;
  }

  colStruct.colDataType = column1.colType.colDataType;

  if (colStruct.tokenFlag)
  {
    WriteEngine::DctnryStruct dictStruct;

    if (idbdatafile::IDBPolicy::useHdfs())
    {
      dictStruct.fCompressionType = 2;
    }

    dictStruct.dctnryOid = column1.colType.ddn.dictOID;
    dictStruct.columnOid = column1.colType.columnOID;
    WriteEngine::DctnryTuple dictTuple;
    dictTuple.sigValue = (unsigned char*)colNewName.c_str();
    dictTuple.sigSize = colNewName.length();
    dictTuple.isNull = false;
    int error = NO_ERROR;

    if (NO_ERROR !=
        (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false)))  // @bug 5572 HDFS tmp file
    {
      WErrorCodes ec;
      err = ec.errorString(error);
      rc = error;
      return rc;
    }

    WriteEngine::Token aToken = dictTuple.token;
    colTuple.data = aToken;
  }
  else
    colTuple.data = datavalue;

  colStruct.colDataType = column1.colType.colDataType;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
    dctnryStruct.fCompressionType = 2;
  }

  if (colStruct.tokenFlag)
  {
    dctnryStruct.dctnryOid = column1.colType.ddn.dictOID;
    dctnryStruct.fCharsetNumber = column1.colType.charsetNumber;
    dctnryStruct.columnOid = colStruct.dataOid;
  }
  else
  {
    dctnryStruct.dctnryOid = 0;
    dctnryStruct.columnOid = colStruct.dataOid;
  }

  colStructs.push_back(colStruct);
  oids[colStruct.dataOid] = colStruct.dataOid;
  cscColTypeList.push_back(column1.colType);

  // oidsToFlush.push_back(colStruct.dataOid);
  if (dctnryStruct.dctnryOid > 0)
  {
    oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
    // oidsToFlush.push_back(dctnryStruct.dctnryOid);
  }

  dctnryStructList.push_back(dctnryStruct);
  aColList1.push_back(colTuple);
  colValuesList.push_back(aColList1);
  WriteEngine::DctnryTuple dctnryTuple;
  boost::to_lower(colNewName);
  dctnryTuple.sigValue = (unsigned char*)colNewName.c_str();
  dctnryTuple.sigSize = colNewName.length();
  dctnryTuple.isNull = false;
  dctColList = dctnryTuple;
  dctRowList.clear();
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);

  // Build AUTOINC_COL structure
  WriteEngine::ColTupleList aColList2;
  colStruct.dataOid = column2.oid;
  colStruct.colWidth = column2.colType.colWidth > 8 ? 8 : column2.colType.colWidth;
  colStruct.tokenFlag = false;
  colStruct.colDataType = column2.colType.colDataType;

  colTuple.data = autoinc;

  colStruct.colDataType = column2.colType.colDataType;

  dctnryStruct.dctnryOid = 0;
  dctnryStruct.columnOid = colStruct.dataOid;

  colStructs.push_back(colStruct);
  oids[colStruct.dataOid] = colStruct.dataOid;
  cscColTypeList.push_back(column2.colType);

  // oidsToFlush.push_back(colStruct.dataOid);
  if (dctnryStruct.dctnryOid > 0)
  {
    oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
    // oidsToFlush.push_back(dctnryStruct.dctnryOid);
  }

  dctnryStructList.push_back(dctnryStruct);
  aColList2.push_back(colTuple);
  colValuesList.push_back(aColList2);
  dctnryTuple.isNull = true;
  dctColList = dctnryTuple;
  dctRowList.clear();
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);

  // Build NEXTVALUE_COL structure
  WriteEngine::ColTupleList aColList3;
  colStruct.dataOid = column3.oid;
  colStruct.colWidth = column3.colType.colWidth > 8 ? 8 : column3.colType.colWidth;
  colStruct.tokenFlag = false;
  colStruct.colDataType = column3.colType.colDataType;

  colTuple.data = nextVal;

  colStruct.colDataType = column3.colType.colDataType;

  dctnryStruct.dctnryOid = 0;
  dctnryStruct.columnOid = colStruct.dataOid;

  colStructs.push_back(colStruct);
  oids[colStruct.dataOid] = colStruct.dataOid;
  cscColTypeList.push_back(column3.colType);

  // oidsToFlush.push_back(colStruct.dataOid);
  if (dctnryStruct.dctnryOid > 0)
  {
    oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
    // oidsToFlush.push_back(dctnryStruct.dctnryOid);
  }

  dctnryStructList.push_back(dctnryStruct);
  aColList3.push_back(colTuple);
  colValuesList.push_back(aColList3);

  dctnryTuple.isNull = true;
  dctColList = dctnryTuple;
  dctRowList.clear();
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);

  // Build NULLABLE_COL structure
  WriteEngine::ColTupleList aColList4;
  colStruct.dataOid = column4.oid;
  colStruct.colWidth = column4.colType.colWidth > 8 ? 8 : column4.colType.colWidth;
  colStruct.tokenFlag = false;
  colStruct.colDataType = column4.colType.colDataType;

  colTuple.data = nullable;

  colStruct.colDataType = column4.colType.colDataType;

  dctnryStruct.dctnryOid = 0;
  dctnryStruct.columnOid = colStruct.dataOid;

  colStructs.push_back(colStruct);
  oids[colStruct.dataOid] = colStruct.dataOid;
  cscColTypeList.push_back(column4.colType);

  // oidsToFlush.push_back(colStruct.dataOid);
  if (dctnryStruct.dctnryOid > 0)
  {
    oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
    // oidsToFlush.push_back(dctnryStruct.dctnryOid);
  }

  dctnryStructList.push_back(dctnryStruct);
  aColList4.push_back(colTuple);
  colValuesList.push_back(aColList4);
  dctnryTuple.isNull = true;
  dctColList = dctnryTuple;
  dctRowList.clear();
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);

  // Build DEFAULTVAL_COL structure
  WriteEngine::ColTupleList aColList5;
  colStruct.dataOid = column5.oid;
  colStruct.colWidth = column5.colType.colWidth > 8 ? 8 : column5.colType.colWidth;
  colStruct.tokenFlag = false;

  if ((column5.colType.colDataType == CalpontSystemCatalog::CHAR && column5.colType.colWidth > 8) ||
      (column5.colType.colDataType == CalpontSystemCatalog::VARCHAR && column5.colType.colWidth > 7) ||
      (column5.colType.colDataType == CalpontSystemCatalog::VARBINARY && column5.colType.colWidth > 7) ||
      (column5.colType.colDataType == CalpontSystemCatalog::BLOB && column5.colType.colWidth > 7) ||
      (column5.colType.colDataType == CalpontSystemCatalog::TEXT && column5.colType.colWidth > 7) ||
      (column5.colType.colDataType == CalpontSystemCatalog::DECIMAL && column5.colType.precision > 18) ||
      (column5.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
       column5.colType.precision > 18))  // token
  {
    colStruct.colWidth = 8;
    colStruct.tokenFlag = true;
  }
  else
  {
    colStruct.colWidth = column5.colType.colWidth;
  }

  colStruct.colDataType = column5.colType.colDataType;

  if (colStruct.tokenFlag)
  {
    WriteEngine::DctnryStruct dictStruct;

    if (idbdatafile::IDBPolicy::useHdfs())
    {
      colStruct.fCompressionType = 2;
      dictStruct.fCompressionType = 2;
    }

    dictStruct.dctnryOid = column5.colType.ddn.dictOID;
    dictStruct.columnOid = column5.colType.columnOID;

    if (defaultvalue.length() <= 0)  // null token
    {
      WriteEngine::Token nullToken;
      colTuple.data = nullToken;
    }
    else
    {
      WriteEngine::DctnryTuple dictTuple;
      dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
      dictTuple.sigSize = defaultvalue.length();
      dictTuple.isNull = false;
      int error = NO_ERROR;

      if (NO_ERROR !=
          (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false)))  // @bug 5572 HDFS tmp file
      {
        WErrorCodes ec;
        throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
      }

      WriteEngine::Token aToken = dictTuple.token;
      colTuple.data = aToken;
    }
  }

  fWEWrapper.flushDataFiles(rc, txnID, oids);

  colStruct.colDataType = column5.colType.colDataType;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    colStruct.fCompressionType = 2;
    dctnryStruct.fCompressionType = 2;
  }

  if (colStruct.tokenFlag)
  {
    dctnryStruct.dctnryOid = column5.colType.ddn.dictOID;
    dctnryStruct.fCharsetNumber = column5.colType.charsetNumber;
    dctnryStruct.columnOid = colStruct.dataOid;
  }
  else
  {
    dctnryStruct.dctnryOid = 0;
    dctnryStruct.columnOid = colStruct.dataOid;
  }

  colStructs.push_back(colStruct);
  dctnryStructList.push_back(dctnryStruct);
  oids[colStruct.dataOid] = colStruct.dataOid;
  cscColTypeList.push_back(column5.colType);

  // oidsToFlush.push_back(colStruct.dataOid);
  if (dctnryStruct.dctnryOid > 0)
  {
    oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
    // oidsToFlush.push_back(dctnryStruct.dctnryOid);
  }

  aColList5.push_back(colTuple);
  colValuesList.push_back(aColList5);

  if (defaultvalue.length() > 0)
  {
    dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
    dctnryTuple.sigSize = defaultvalue.length();
    dctnryTuple.isNull = false;
  }
  else
  {
    dctnryTuple.isNull = true;
  }

  dctColList = dctnryTuple;
  dctRowList.clear();
  dctRowList.push_back(dctColList);
  dctnryValueList.push_back(dctRowList);
  std::vector<WriteEngine::ColStructList> colExtentsStruct;
  std::vector<WriteEngine::CSCTypesList> colExtentsColType;
  std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
  std::vector<WriteEngine::RIDList> ridLists;
  ridLists.push_back(ridList);

  // In this case, there's only 1 row, so only one one extent, but keep it generic...
  std::vector<extentInfo> extentsinfo;
  extentInfo aExtentinfo;

  convertRidToColumn(ropair.rid, dbRoot, partition, segment, 1021);

  aExtentinfo.dbRoot = dbRoot;
  aExtentinfo.partition = partition;
  aExtentinfo.segment = segment;

  extentsinfo.push_back(aExtentinfo);

  // build colExtentsStruct
  for (unsigned i = 0; i < extentsinfo.size(); i++)
  {
    for (unsigned j = 0; j < colStructs.size(); j++)
    {
      colStructs[j].fColPartition = extentsinfo[i].partition;
      colStructs[j].fColSegment = extentsinfo[i].segment;
      colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
      dctnryStructList[j].fColPartition = extentsinfo[i].partition;
      dctnryStructList[j].fColSegment = extentsinfo[i].segment;
      dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
    }

    colExtentsStruct.push_back(colStructs);
    dctnryExtentsStruct.push_back(dctnryStructList);
    colExtentsColType.push_back(cscColTypeList);
  }

  // call the write engine to update the row
  if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList,
                                             colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList,
                                             SYSCOLUMN_BASE))
  {
    err = "WE: Update failed on: " + atableName.table;
    rc = 1;
  }

  int rc1 = 0;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);

    if ((rc == 0) && (rc1 == 0))
    {
      rc1 = fWEWrapper.confirmTransaction(txnID);

      if (rc1 == NO_ERROR)
        rc1 = fWEWrapper.endTransaction(txnID, true);
      else
        fWEWrapper.endTransaction(txnID, false);
    }
    else
    {
      fWEWrapper.endTransaction(txnID, false);
    }
  }

  if (rc == 0)
    rc = rc1;

  // flush syscat cahche
  systemCatalogPtr->flushCache();
  purgeFDCache();
  // if (idbdatafile::IDBPolicy::useHdfs())
  //		cacheutils::flushOIDsFromCache(oidsToFlush);
  return rc;
}

uint8_t WE_DDLCommandProc::dropPartitions(ByteStream& bs, std::string& err)
{
  int rc = 0;
  uint32_t size, i;
  uint32_t tmp32;
  std::vector<OID> dataOids;
  std::vector<BRM::PartitionInfo> partitions;

  bs >> size;

  for (i = 0; i < size; ++i)
  {
    bs >> tmp32;
    dataOids.push_back(tmp32);
  }

  bs >> size;
  BRM::PartitionInfo pi;

  for (i = 0; i < size; ++i)
  {
    pi.unserialize(bs);
    partitions.push_back(pi);
  }

  try
  {
    rc = fWEWrapper.deletePartitions(dataOids, partitions);
  }
  catch (...)
  {
    err = "WE: Error removing files ";
    rc = 1;
  }

  return rc;
}

void WE_DDLCommandProc::purgeFDCache()
{
  if (idbdatafile::IDBPolicy::useHdfs())
  {
    TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(SYSCOLUMN_BASE);
    ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
    ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
    ColExtsInfo::iterator aIt;
    std::vector<BRM::FileInfo> files;
    BRM::FileInfo aFile;
    vector<BRM::LBID_t> lbidList;
    BRM::LBID_t startLbid;

    while (it != colsExtsInfoMap.end())
    {
      aIt = (it->second).begin();
      aFile.oid = it->first;

      while (aIt != (it->second).end())
      {
        aFile.partitionNum = aIt->partNum;
        aFile.dbRoot = aIt->dbRoot;
        aFile.segmentNum = aIt->segNum;
        aFile.compType = aIt->compType;
        files.push_back(aFile);
        fDbrm.lookupLocalStartLbid(aFile.oid, aFile.partitionNum, aFile.segmentNum, aIt->hwm, startLbid);
        // cout <<"Added to files oid:dbroot:part:seg:compType = " <<
        // aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
        //<<":"<<aFile.compType <<endl;
        aIt++;
      }

      it++;
    }

    cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
    fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);
  }

  TableMetaData::removeTableMetaData(SYSCOLUMN_BASE);
}

}  // namespace WriteEngine
